Make a reader-specific thread pool

This concerns threads spawned by the reader for tasks like syntax
highlighting that may need to perform I/O.

These are different from other threads typically because they need to
"report back" and have their results handled.

Create a dedicated module where this logic can live.
This also eliminates the "global" thread pool.
This commit is contained in:
Peter Ammon
2025-10-25 21:53:31 -07:00
parent eca19006ad
commit 2f5260aabd
7 changed files with 47 additions and 50 deletions

View File

@@ -10,10 +10,11 @@
use crate::key::{self, Key, Modifiers, canonicalize_raw_escapes, ctrl};
use crate::proc::job_reap;
use crate::reader::{
Reader, reader_reading_interrupted, reader_reset_interrupted, reader_schedule_prompt_repaint,
Reader, iothreads, reader_reading_interrupted, reader_reset_interrupted,
reader_schedule_prompt_repaint,
};
use crate::signal::signal_clear_cancel;
use crate::threads::{assert_is_main_thread, iothread_service_main};
use crate::threads::assert_is_main_thread;
use crate::wchar::prelude::*;
use once_cell::sync::Lazy;
use std::mem;
@@ -419,7 +420,7 @@ fn uvar_change_notified(&mut self) {
}
fn ioport_notified(&mut self) {
iothread_service_main(self);
iothreads::invoke_completions(self);
}
fn paste_start_buffering(&mut self) {

View File

@@ -9,8 +9,7 @@
self, Key, Modifiers, ViewportPosition, alt, canonicalize_control_char,
canonicalize_keyed_control_char, char_to_symbol, function_key, shift,
};
use crate::reader::reader_test_and_clear_interrupted;
use crate::threads::iothread_port;
use crate::reader::{iothreads, reader_test_and_clear_interrupted};
use crate::tty_handoff::{
SCROLL_CONTENT_UP_TERMINFO_CODE, XTVERSION, maybe_set_kitty_keyboard_capability,
maybe_set_scroll_content_up_capability,
@@ -566,7 +565,7 @@ fn next_input_event(in_fd: RawFd, timeout: Timeout) -> InputEventTrigger {
fdset.add(in_fd);
// Add the completion ioport.
let ioport_fd = iothread_port();
let ioport_fd = iothreads::completion_port();
fdset.add(ioport_fd);
// Get the uvar notifier fd (possibly none).

31
src/reader/iothreads.rs Normal file
View File

@@ -0,0 +1,31 @@
//! A thread pool for handling operations related to the interactive reader
//! which might block, such as file I/O or completions.
use crate::threads::ThreadPool;
use std::sync::{Arc, OnceLock};
/// The maximum number of I/O threads in this pool.
const IO_MAX_THREADS: usize = 16;
/// The iothreads [`ThreadPool`] singleton. Used to lift I/O off of the main thread and used for
/// completions, etc.
static THREAD_POOL: OnceLock<Arc<ThreadPool>> = OnceLock::new();
/// Returns a reference to the singleton thread pool.
#[inline]
pub fn thread_pool() -> &'static Arc<ThreadPool> {
// Soft max of 1 so we keep a single thread hanging around for a time to
// amortize thread creation costs as the user ctypes, etc.
THREAD_POOL.get_or_init(|| ThreadPool::new(1, IO_MAX_THREADS))
}
/// Return the read fd of the singleton ThreadPool event signaller.
/// This may be used with `poll()` or `select()` to multiplex iothread completions with other events.
pub fn completion_port() -> i32 {
thread_pool().event_signaller_read_port()
}
/// Invoke completions for the given reader.
pub fn invoke_completions(reader: &mut crate::reader::Reader) {
thread_pool().invoke_completions(reader);
}

View File

@@ -1,5 +1,6 @@
mod history_search;
pub mod iothreads;
#[allow(clippy::module_inception)]
pub mod reader;

View File

@@ -48,6 +48,7 @@
use errno::{Errno, errno};
use super::history_search::{ReaderHistorySearch, SearchMode, smartcase_flags};
use super::iothreads::thread_pool as io_thread_pool;
use crate::abbrs::abbrs_match;
use crate::ast::{self, Kind, is_same_node};
use crate::builtins::shared::ErrorCode;
@@ -135,10 +136,7 @@
use crate::termsize::{termsize_invalidate_tty, termsize_last, termsize_update};
use crate::text_face::TextFace;
use crate::text_face::parse_text_face;
use crate::threads::{
Debounce, assert_is_background_thread, assert_is_main_thread, io_thread_pool,
iothread_service_main_with_timeout,
};
use crate::threads::{Debounce, assert_is_background_thread, assert_is_main_thread};
use crate::tokenizer::quote_end;
use crate::tokenizer::variable_assignment_equals_pos;
use crate::tokenizer::{
@@ -5284,7 +5282,7 @@ fn finish_highlighting_before_exec(&mut self) {
let deadline = now + HIGHLIGHT_TIMEOUT_FOR_EXECUTION;
while now < deadline {
let timeout = deadline - now;
iothread_service_main_with_timeout(self, timeout);
io_thread_pool().invoke_completions_with_timeout(self, timeout);
// Note iothread_service_main_with_timeout will reentrantly modify us,
// by invoking a completion.

View File

@@ -11,7 +11,6 @@
use crate::reader::{fake_scoped_reader, reader_reset_interrupted};
use crate::signal::{signal_clear_cancel, signal_reset_handlers, signal_set_handlers};
use crate::tests::prelude::*;
use crate::threads::iothread_perform;
use crate::wchar::prelude::*;
use crate::wcstringutil::join_strings;
use libc::SIGINT;
@@ -703,7 +702,7 @@ fn test_1_cancellation(parser: &Parser, src: &wstr) {
let delay = Duration::from_millis(100);
#[allow(clippy::unnecessary_cast)]
let thread = unsafe { libc::pthread_self() } as usize;
iothread_perform(move || {
std::thread::spawn(move || {
// Wait a while and then SIGINT the main thread.
std::thread::sleep(delay);
unsafe {

View File

@@ -21,17 +21,10 @@ impl FloggableDebug for std::thread::ThreadId {}
/// This allows us to notice when we've forked.
static IS_FORKED_PROC: AtomicBool = AtomicBool::new(false);
/// Maximum number of threads for the IO thread pool.
const IO_MAX_THREADS: usize = 1024;
/// How long an idle [`ThreadPool`] thread will wait for work (against the condition variable)
/// before exiting.
const IO_WAIT_FOR_WORK_DURATION: Duration = Duration::from_millis(500);
/// The iothreads [`ThreadPool`] singleton. Used to lift I/O off of the main thread and used for
/// completions, etc.
static IO_THREAD_POOL: OnceLock<Arc<ThreadPool>> = OnceLock::new();
/// A [`ThreadPool`] work request.
type WorkItem = Box<dyn FnOnce() + 'static + Send>;
@@ -57,10 +50,6 @@ extern "C" fn child_post_fork() {
let result = libc::pthread_atfork(None, None, Some(child_post_fork));
assert_eq!(result, 0, "pthread_atfork() failure: {}", errno::errno());
}
IO_THREAD_POOL
.set(ThreadPool::new(1, IO_MAX_THREADS))
.expect("IO_THREAD_POOL has already been initialized!");
}
#[inline(always)]
@@ -359,6 +348,11 @@ pub fn invoke_completions(&self, ctx: &mut Reader) {
}
}
/// Return the event signaller read port.
pub fn event_signaller_read_port(&self) -> i32 {
self.event_signaller.read_fd()
}
/// Invoke completions, waiting up to `timeout` for completions to be available.
pub fn invoke_completions_with_timeout(&self, ctx: &mut Reader, timeout: Duration) {
let timeout = fd_readable_set::Timeout::Duration(timeout);
@@ -468,32 +462,6 @@ fn dequeue_work_or_commit_to_exit(&self) -> Option<WorkItem> {
}
}
/// Returns a reference to the singleton thread pool.
#[inline]
pub fn io_thread_pool() -> &'static Arc<ThreadPool> {
IO_THREAD_POOL.get().unwrap()
}
/// Enqueues work on the IO thread pool singleton.
pub fn iothread_perform(f: impl FnOnce() + 'static + Send) {
let thread_pool = io_thread_pool();
thread_pool.perform(f);
}
/// Return the read fd of the singleton ThreadPool event signaller.
/// This may be used with `poll()` or `select()` to multiplex iothread completions with other events.
pub fn iothread_port() -> i32 {
io_thread_pool().event_signaller.read_fd()
}
pub fn iothread_service_main_with_timeout(ctx: &mut Reader, timeout: Duration) {
io_thread_pool().invoke_completions_with_timeout(ctx, timeout);
}
pub fn iothread_service_main(ctx: &mut Reader) {
io_thread_pool().invoke_completions(ctx);
}
/// `Debounce` is a simple class which executes one function on a background thread while enqueuing
/// at most one more. Subsequent execution requests overwrite the enqueued one. It takes an optional
/// timeout; if a handler does not finish within the timeout then a new thread is spawned to service