On process exit, read output into buffer to fix ordering

This command

	echo $(/bin/echo -n 1; echo -n 2)

sometimes outputs "21" because we implement this as

	let bufferfill = IoBufferfill::create_opts(...);
	...
	let eval_res = parser.eval_with(...);
	let buffer = IoBufferfill::finish(bufferfill);

i.e. /bin/echo and builtin echo both output to the same buffer; the
builtin does inside parser.eval_with(), and the external process may
or may not output before that, depending on when the FD monitor thread
gets scheduled (to run item_callback).

(Unrelated to that we make sure to consume all available input in
"IoBufferfill::finish(bufferfill)" but that doesn't help with
ordering.)

Fix this by reading all available data from stdout after the child
process has exited.

This means we need to pass the BufferFill down to
process_mark_finished_children().

We don't need to do this for builtins like "fg" or "wait",
because commands that buffer output do not get job control, see
2ca66cff53 (Disable job control inside command substitutions,
2021-07-26).
We also don't need to do it when reaping from reader because there
should be no buffering(?).

fish still deviates from other shells in that it doesn't wait for
it's child's stdout to be closed, meaning that this will behave
non-deterministically.

	fish -c '
	    echo -n $(
	        sh -c " ( for i in \$(seq 10000); do printf .; done ) & "
	    )
	' | wc -c

We should fix that later.

Closes #12018
This commit is contained in:
Johannes Altmanninger
2025-11-18 10:54:54 +01:00
parent 02061be279
commit f1c8e6995d
11 changed files with 101 additions and 23 deletions

View File

@@ -151,7 +151,7 @@ pub fn fg(parser: &Parser, streams: &mut IoStreams, argv: &mut [&wstr]) -> Built
handoff.to_job_group(job.group.as_ref().unwrap());
let resumed = job.resume();
if resumed {
job.continue_job(parser);
job.continue_job(parser, /*block_io=*/ None);
}
if job.is_stopped() {
handoff.save_tty_modes();

View File

@@ -210,7 +210,7 @@ pub fn exec_job(parser: &Parser, job: &Job, block_io: IoChain) -> bool {
parser,
&job.processes()[dp],
job,
block_io,
block_io.clone(),
deferred_pipes,
&PartialPipes::default(),
true,
@@ -243,7 +243,7 @@ pub fn exec_job(parser: &Parser, job: &Job, block_io: IoChain) -> bool {
}
if !job.is_initially_background() {
job.continue_job(parser);
job.continue_job(parser, Some(&block_io));
}
if job.is_stopped() {

View File

@@ -286,6 +286,11 @@ pub fn add(&self, fd: AutoCloseFd, callback: Callback) -> FdMonitorItemId {
item_id
}
pub fn with_fd(&self, item_id: FdMonitorItemId, cb: impl FnOnce(&AutoCloseFd)) {
let data = self.data.lock().expect("Mutex poisoned!");
cb(&data.items.get(&item_id).unwrap().fd);
}
/// Remove an item from the monitor and return its file descriptor.
/// Note we may remove an item whose fd is currently being waited on in select(); this is
/// considered benign because the underlying item will no longer be present and so its

View File

@@ -355,9 +355,13 @@ pub fn buffer(&self) -> &IoBuffer {
&self.buffer
}
pub fn read_all_available(&self) {
fd_monitor().with_fd(self.item_id, |fd| self.buffer.read_all_available(fd));
}
/// Reset the receiver (possibly closing the write end of the pipe), and complete the fillthread
/// of the buffer. Return the buffer.
pub fn finish(filler: Arc<IoBufferfill>) -> SeparatedBuffer {
pub fn finish(filler: Arc<Self>) -> SeparatedBuffer {
// The io filler is passed in. This typically holds the only instance of the write side of the
// pipe used by the buffer's fillthread (except for that side held by other processes).
// Then allow the buffer to finish.
@@ -435,14 +439,28 @@ pub fn read_once(fd: RawFd, buffer: &mut MutexGuard<'_, SeparatedBuffer>) -> isi
amt
}
pub fn read_all_available(&self, fd: &AutoCloseFd) {
let mut locked_buff = self.0.lock().unwrap();
self.do_read_all_available(fd, &mut locked_buff);
}
fn do_read_all_available(
&self,
fd: &AutoCloseFd,
locked_buff: &mut MutexGuard<'_, SeparatedBuffer>,
) {
// Read any remaining data from the pipe.
while fd.is_valid() && IoBuffer::read_once(fd.as_raw_fd(), &mut *locked_buff) > 0 {
// pass
}
}
/// End the background fillthread operation, and return the buffer, transferring ownership.
/// The read end of the pipe is provided.
pub fn complete_and_take_buffer(&self, fd: AutoCloseFd) -> SeparatedBuffer {
// Read any remaining data from the pipe.
let mut locked_buff = self.0.lock().unwrap();
while fd.is_valid() && IoBuffer::read_once(fd.as_raw_fd(), &mut locked_buff) > 0 {
// pass
}
self.do_read_all_available(&fd, &mut locked_buff);
// Return our buffer, transferring ownership.
let mut result = SeparatedBuffer::new(locked_buff.limit());

View File

@@ -1672,7 +1672,7 @@ fn run_1_job(
profile_item.skipped = pop_result != EndExecutionReason::Ok;
}
job_reap(ctx.parser(), false); // clean up jobs
job_reap(ctx.parser(), false, Some(&self.block_io)); // clean up jobs
pop_result
}

View File

@@ -672,7 +672,7 @@ pub fn eval_node<T: Node>(
return EvalRes::new(ProcStatus::from_signal(sig));
}
job_reap(self, false); // not sure why we reap jobs here
job_reap(self, false, Some(block_io)); // not sure why we reap jobs here
// Start it up
let mut op_ctx = self.context();
@@ -707,7 +707,7 @@ pub fn eval_node<T: Node>(
ScopeGuarding::commit(restore_line_counter);
self.pop_block(scope_block);
job_reap(self, false); // reap again
job_reap(self, false, Some(block_io)); // reap again
let sig = signal_check_cancel();
if sig != 0 {

View File

@@ -25,8 +25,8 @@
use fish_wchar::ToWString;
use libc::{
_SC_CLK_TCK, EXIT_SUCCESS, SIG_DFL, SIG_IGN, SIGABRT, SIGBUS, SIGCONT, SIGFPE, SIGHUP, SIGILL,
SIGINT, SIGKILL, SIGPIPE, SIGQUIT, SIGSEGV, SIGSYS, SIGTTOU, WCONTINUED, WEXITSTATUS,
WIFCONTINUED, WIFEXITED, WIFSIGNALED, WIFSTOPPED, WNOHANG, WTERMSIG, WUNTRACED,
SIGINT, SIGKILL, SIGPIPE, SIGQUIT, SIGSEGV, SIGSYS, SIGTTOU, STDOUT_FILENO, WCONTINUED,
WEXITSTATUS, WIFCONTINUED, WIFEXITED, WIFSIGNALED, WIFSTOPPED, WNOHANG, WTERMSIG, WUNTRACED,
};
use once_cell::sync::Lazy;
#[cfg(not(target_has_atomic = "64"))]
@@ -786,7 +786,7 @@ pub fn posts_job_exit_events(&self) -> bool {
}
/// Run ourselves. Returning once we complete or stop.
pub fn continue_job(&self, parser: &Parser) {
pub fn continue_job(&self, parser: &Parser, block_io: Option<&IoChain>) {
flogf!(
proc_job_run,
"Run job %d (%s), %s, %s",
@@ -806,7 +806,7 @@ pub fn continue_job(&self, parser: &Parser) {
// Wait for the status of our own job to change.
while !fish_is_unwinding_for_exit() && !self.is_stopped() && !self.is_completed() {
process_mark_finished_children(parser, true);
process_mark_finished_children(parser, /*block_ok=*/ true, block_io);
}
if self.is_completed() {
// Set $status only if we are in the foreground and the last process in the job has
@@ -963,13 +963,13 @@ pub fn set_job_control_mode(mode: JobControl) {
/// Notify the user about stopped or terminated jobs, and delete completed jobs from the job list.
/// If `interactive` is set, allow removing interactive jobs; otherwise skip them.
/// Return whether text was printed to stdout.
pub fn job_reap(parser: &Parser, interactive: bool) -> bool {
pub fn job_reap(parser: &Parser, interactive: bool, block_io: Option<&IoChain>) -> bool {
// Early out for the common case that there are no jobs.
if parser.jobs().is_empty() {
return false;
}
process_mark_finished_children(parser, false /* not block_ok */);
process_mark_finished_children(parser, /*block_ok=*/ false, block_io);
process_clean_after_marking(parser, interactive)
}
@@ -1103,7 +1103,7 @@ fn handle_child_status(job: &Job, proc: &Process, status: ProcStatus) {
/// Wait for any process finishing, or receipt of a signal.
pub fn proc_wait_any(parser: &Parser) {
process_mark_finished_children(parser, true /*block_ok*/);
process_mark_finished_children(parser, /*block_ok=*/ true, /*block_io=*/ None);
let is_interactive = parser.scope().is_interactive;
process_clean_after_marking(parser, is_interactive);
}
@@ -1177,7 +1177,7 @@ fn reap_disowned_pids() {
/// See if any reapable processes have exited, and mark them accordingly.
/// \param block_ok if no reapable processes have exited, block until one is (or until we receive a
/// signal).
fn process_mark_finished_children(parser: &Parser, block_ok: bool) {
fn process_mark_finished_children(parser: &Parser, block_ok: bool, block_io: Option<&IoChain>) {
// Get the exit and signal generations of all reapable processes.
// The exit generation tells us if we have an exit; the signal generation allows for detecting
// SIGHUP and SIGINT.
@@ -1260,6 +1260,8 @@ fn process_mark_finished_children(parser: &Parser, block_ok: bool) {
pid,
proc.status().status_value()
);
block_io.map(bufferfill_read_finished_process_output);
} else {
assert!(status.stopped() || status.continued());
flogf!(
@@ -1321,6 +1323,16 @@ fn process_mark_finished_children(parser: &Parser, block_ok: bool) {
reap_disowned_pids();
}
fn bufferfill_read_finished_process_output(block_io: &IoChain) {
let Some(stdout) = block_io.io_for_fd(STDOUT_FILENO) else {
return;
};
let Some(stdout) = stdout.as_bufferfill() else {
return;
};
stdout.read_all_available();
}
/// Generate process_exit events for any completed processes in `j`.
fn generate_process_exit_events(j: &Job, out_evts: &mut Vec<Event>) {
// Historically we have avoided generating events for foreground jobs from event handlers, as an

View File

@@ -25,7 +25,7 @@ fn get_ioport_fd(&self) -> RawFd {
fn prepare_to_select(&mut self) {
// Fire any pending events and reap stray processes, including printing exit status messages.
event::fire_delayed(self.parser);
if job_reap(self.parser, true) {
if job_reap(self.parser, true, None) {
reader_schedule_prompt_repaint();
}
}
@@ -38,7 +38,7 @@ fn select_interrupted(&mut self) {
// Fire any pending events and reap stray processes, including printing exit status messages.
let parser = self.parser;
event::fire_delayed(parser);
if job_reap(parser, true) {
if job_reap(parser, true, None) {
reader_schedule_prompt_repaint();
}

View File

@@ -4865,7 +4865,7 @@ fn exec_prompt(&mut self, full_prompt: bool, final_prompt: bool) {
// Reap jobs but do NOT trigger a repaint.
// This is to prevent infinite loops in case a job from the prompt triggers a repaint.
// See #9796.
job_reap(self.parser, true);
job_reap(self.parser, true, None);
// Some prompt may have requested an exit (#8033).
let exit_current_script = self.parser.libdata().exit_current_script;
@@ -5995,7 +5995,7 @@ fn reader_run_command(parser: &Parser, cmd: &wstr) -> EvalRes {
let time_before = Instant::now();
let eval_res = parser.eval(cmd, &IoChain::new());
job_reap(parser, true);
job_reap(parser, true, None);
// Update the execution duration iff a command is requested for execution
// issue - #4926

View File

@@ -1,6 +1,5 @@
# RUN: %fish %s
# REQUIRES: command -v diff
# REQUIRES: test -z "$CI"
fish_config prompt list | string match -r '^(?:acidhub|disco|nim)$'
# CHECK: acidhub

View File

@@ -0,0 +1,44 @@
# RUN: fish=%fish %fish %s
set -g nproc (
if command -v nproc >/dev/null
nproc
else
sysctl -n hw.logicalcpu
end
)
function run-concurrently
for i in (seq (math "10 * min($nproc, 16)"))
$fish -c $argv[1] &
end
wait
end
run-concurrently '
set t "$(
/bin/echo -n A
echo -n B
)"
test $t = AB
or echo $t
'
run-concurrently '
eval "
/bin/echo -n C
echo -n D
" | read -l t
test $t = CD
or echo $t
'
run-concurrently '
# block/function node output is buffered also
begin
/bin/echo -n E
echo -n F
end |
read -l t
test $t = EF
or echo $t
'