Background threads to use a separate pool from reader

Sometimes we need to spawn threads to service internal processes. Make
this use a separate thread pool from the pool used for interactive tasks
(like detecting which arguments are files for syntax highlighting).
This commit is contained in:
Peter Ammon
2025-10-19 13:33:58 -07:00
parent e299b71560
commit e20b06df1a

View File

@@ -41,7 +41,7 @@
};
use crate::reader::{reader_run_count, safe_restore_term_mode};
use crate::redirection::{Dup2List, dup2_list_resolve_chain};
use crate::threads::{iothread_perform_cant_wait, is_forked_child};
use crate::threads::{ThreadPool, is_forked_child};
use crate::trace::trace_if_enabled_with_args;
use crate::tty_handoff::TtyHandoff;
use crate::wchar::prelude::*;
@@ -60,8 +60,20 @@
use std::num::NonZeroU32;
use std::os::fd::{AsRawFd, OwnedFd, RawFd};
use std::slice;
use std::sync::atomic::Ordering;
use std::sync::{Arc, atomic::AtomicUsize};
use std::sync::{
Arc, OnceLock,
atomic::{AtomicUsize, Ordering},
};
/// The singleton shared exec thread pool.
/// This is used to write the output of internal processes (e.g. builtins)
/// to their target fds.
/// TODO: this IO could be multiplexed using FdMonitor.
fn exec_thread_pool() -> &'static Arc<ThreadPool> {
static EXEC_THREAD_POOL: OnceLock<Arc<ThreadPool>> = OnceLock::new();
// Use an unbounded queue because otherwise we risk deadlock.
EXEC_THREAD_POOL.get_or_init(|| ThreadPool::new(1, usize::MAX))
}
/// Execute the processes specified by `j` in the parser \p.
/// On a true return, the job was successfully launched and the parser will take responsibility for
@@ -601,30 +613,33 @@ fn skip_err(&self) -> bool {
// builtin_run provide this directly, rather than setting it in the process.
f.success_status = p.status();
iothread_perform_cant_wait(move || {
let mut status = f.success_status;
if !f.skip_out() {
if let Err(err) = write_loop(&f.src_outfd, &f.outdata) {
if err.raw_os_error() != Some(EPIPE) {
perror("write");
}
if status.is_success() {
status = ProcStatus::from_exit_code(1);
exec_thread_pool().perform(
move || {
let mut status = f.success_status;
if !f.skip_out() {
if let Err(err) = write_loop(&f.src_outfd, &f.outdata) {
if err.raw_os_error() != Some(EPIPE) {
perror("write");
}
if status.is_success() {
status = ProcStatus::from_exit_code(1);
}
}
}
}
if !f.skip_err() {
if let Err(err) = write_loop(&f.src_errfd, &f.errdata) {
if err.raw_os_error() != Some(EPIPE) {
perror("write");
}
if status.is_success() {
status = ProcStatus::from_exit_code(1);
if !f.skip_err() {
if let Err(err) = write_loop(&f.src_errfd, &f.errdata) {
if err.raw_os_error() != Some(EPIPE) {
perror("write");
}
if status.is_success() {
status = ProcStatus::from_exit_code(1);
}
}
}
}
f.internal_proc.mark_exited(status);
});
f.internal_proc.mark_exited(status);
},
true,
);
}
/// If `outdata` or `errdata` are both empty, then mark the process as completed immediately.