From 2f5260aabd3bc39bdf9afc0a4f9c794d8191c612 Mon Sep 17 00:00:00 2001 From: Peter Ammon Date: Sat, 25 Oct 2025 21:53:31 -0700 Subject: [PATCH] Make a reader-specific thread pool This concerns threads spawned by the reader for tasks like syntax highlighting that may need to perform I/O. These are different from other threads typically because they need to "report back" and have their results handled. Create a dedicated module where this logic can live. This also eliminates the "global" thread pool. --- src/input.rs | 7 ++++--- src/input_common.rs | 5 ++--- src/reader/iothreads.rs | 31 ++++++++++++++++++++++++++++++ src/reader/mod.rs | 1 + src/reader/reader.rs | 8 +++----- src/tests/parser.rs | 3 +-- src/threads.rs | 42 +++++------------------------------------ 7 files changed, 47 insertions(+), 50 deletions(-) create mode 100644 src/reader/iothreads.rs diff --git a/src/input.rs b/src/input.rs index 8f91e4b0d..76fe04c75 100644 --- a/src/input.rs +++ b/src/input.rs @@ -10,10 +10,11 @@ use crate::key::{self, Key, Modifiers, canonicalize_raw_escapes, ctrl}; use crate::proc::job_reap; use crate::reader::{ - Reader, reader_reading_interrupted, reader_reset_interrupted, reader_schedule_prompt_repaint, + Reader, iothreads, reader_reading_interrupted, reader_reset_interrupted, + reader_schedule_prompt_repaint, }; use crate::signal::signal_clear_cancel; -use crate::threads::{assert_is_main_thread, iothread_service_main}; +use crate::threads::assert_is_main_thread; use crate::wchar::prelude::*; use once_cell::sync::Lazy; use std::mem; @@ -419,7 +420,7 @@ fn uvar_change_notified(&mut self) { } fn ioport_notified(&mut self) { - iothread_service_main(self); + iothreads::invoke_completions(self); } fn paste_start_buffering(&mut self) { diff --git a/src/input_common.rs b/src/input_common.rs index b44051486..bd4834e91 100644 --- a/src/input_common.rs +++ b/src/input_common.rs @@ -9,8 +9,7 @@ self, Key, Modifiers, ViewportPosition, alt, canonicalize_control_char, canonicalize_keyed_control_char, char_to_symbol, function_key, shift, }; -use crate::reader::reader_test_and_clear_interrupted; -use crate::threads::iothread_port; +use crate::reader::{iothreads, reader_test_and_clear_interrupted}; use crate::tty_handoff::{ SCROLL_CONTENT_UP_TERMINFO_CODE, XTVERSION, maybe_set_kitty_keyboard_capability, maybe_set_scroll_content_up_capability, @@ -566,7 +565,7 @@ fn next_input_event(in_fd: RawFd, timeout: Timeout) -> InputEventTrigger { fdset.add(in_fd); // Add the completion ioport. - let ioport_fd = iothread_port(); + let ioport_fd = iothreads::completion_port(); fdset.add(ioport_fd); // Get the uvar notifier fd (possibly none). diff --git a/src/reader/iothreads.rs b/src/reader/iothreads.rs new file mode 100644 index 000000000..5810426cc --- /dev/null +++ b/src/reader/iothreads.rs @@ -0,0 +1,31 @@ +//! A thread pool for handling operations related to the interactive reader +//! which might block, such as file I/O or completions. + +use crate::threads::ThreadPool; +use std::sync::{Arc, OnceLock}; + +/// The maximum number of I/O threads in this pool. +const IO_MAX_THREADS: usize = 16; + +/// The iothreads [`ThreadPool`] singleton. Used to lift I/O off of the main thread and used for +/// completions, etc. +static THREAD_POOL: OnceLock> = OnceLock::new(); + +/// Returns a reference to the singleton thread pool. +#[inline] +pub fn thread_pool() -> &'static Arc { + // Soft max of 1 so we keep a single thread hanging around for a time to + // amortize thread creation costs as the user ctypes, etc. + THREAD_POOL.get_or_init(|| ThreadPool::new(1, IO_MAX_THREADS)) +} + +/// Return the read fd of the singleton ThreadPool event signaller. +/// This may be used with `poll()` or `select()` to multiplex iothread completions with other events. +pub fn completion_port() -> i32 { + thread_pool().event_signaller_read_port() +} + +/// Invoke completions for the given reader. +pub fn invoke_completions(reader: &mut crate::reader::Reader) { + thread_pool().invoke_completions(reader); +} diff --git a/src/reader/mod.rs b/src/reader/mod.rs index 6b4538afb..82e839a5f 100644 --- a/src/reader/mod.rs +++ b/src/reader/mod.rs @@ -1,5 +1,6 @@ mod history_search; +pub mod iothreads; #[allow(clippy::module_inception)] pub mod reader; diff --git a/src/reader/reader.rs b/src/reader/reader.rs index 102127581..43bebcb54 100644 --- a/src/reader/reader.rs +++ b/src/reader/reader.rs @@ -48,6 +48,7 @@ use errno::{Errno, errno}; use super::history_search::{ReaderHistorySearch, SearchMode, smartcase_flags}; +use super::iothreads::thread_pool as io_thread_pool; use crate::abbrs::abbrs_match; use crate::ast::{self, Kind, is_same_node}; use crate::builtins::shared::ErrorCode; @@ -135,10 +136,7 @@ use crate::termsize::{termsize_invalidate_tty, termsize_last, termsize_update}; use crate::text_face::TextFace; use crate::text_face::parse_text_face; -use crate::threads::{ - Debounce, assert_is_background_thread, assert_is_main_thread, io_thread_pool, - iothread_service_main_with_timeout, -}; +use crate::threads::{Debounce, assert_is_background_thread, assert_is_main_thread}; use crate::tokenizer::quote_end; use crate::tokenizer::variable_assignment_equals_pos; use crate::tokenizer::{ @@ -5284,7 +5282,7 @@ fn finish_highlighting_before_exec(&mut self) { let deadline = now + HIGHLIGHT_TIMEOUT_FOR_EXECUTION; while now < deadline { let timeout = deadline - now; - iothread_service_main_with_timeout(self, timeout); + io_thread_pool().invoke_completions_with_timeout(self, timeout); // Note iothread_service_main_with_timeout will reentrantly modify us, // by invoking a completion. diff --git a/src/tests/parser.rs b/src/tests/parser.rs index 6b010c2cd..f37bf3030 100644 --- a/src/tests/parser.rs +++ b/src/tests/parser.rs @@ -11,7 +11,6 @@ use crate::reader::{fake_scoped_reader, reader_reset_interrupted}; use crate::signal::{signal_clear_cancel, signal_reset_handlers, signal_set_handlers}; use crate::tests::prelude::*; -use crate::threads::iothread_perform; use crate::wchar::prelude::*; use crate::wcstringutil::join_strings; use libc::SIGINT; @@ -703,7 +702,7 @@ fn test_1_cancellation(parser: &Parser, src: &wstr) { let delay = Duration::from_millis(100); #[allow(clippy::unnecessary_cast)] let thread = unsafe { libc::pthread_self() } as usize; - iothread_perform(move || { + std::thread::spawn(move || { // Wait a while and then SIGINT the main thread. std::thread::sleep(delay); unsafe { diff --git a/src/threads.rs b/src/threads.rs index 5cb0bb58c..d5b926eeb 100644 --- a/src/threads.rs +++ b/src/threads.rs @@ -21,17 +21,10 @@ impl FloggableDebug for std::thread::ThreadId {} /// This allows us to notice when we've forked. static IS_FORKED_PROC: AtomicBool = AtomicBool::new(false); -/// Maximum number of threads for the IO thread pool. -const IO_MAX_THREADS: usize = 1024; - /// How long an idle [`ThreadPool`] thread will wait for work (against the condition variable) /// before exiting. const IO_WAIT_FOR_WORK_DURATION: Duration = Duration::from_millis(500); -/// The iothreads [`ThreadPool`] singleton. Used to lift I/O off of the main thread and used for -/// completions, etc. -static IO_THREAD_POOL: OnceLock> = OnceLock::new(); - /// A [`ThreadPool`] work request. type WorkItem = Box; @@ -57,10 +50,6 @@ extern "C" fn child_post_fork() { let result = libc::pthread_atfork(None, None, Some(child_post_fork)); assert_eq!(result, 0, "pthread_atfork() failure: {}", errno::errno()); } - - IO_THREAD_POOL - .set(ThreadPool::new(1, IO_MAX_THREADS)) - .expect("IO_THREAD_POOL has already been initialized!"); } #[inline(always)] @@ -359,6 +348,11 @@ pub fn invoke_completions(&self, ctx: &mut Reader) { } } + /// Return the event signaller read port. + pub fn event_signaller_read_port(&self) -> i32 { + self.event_signaller.read_fd() + } + /// Invoke completions, waiting up to `timeout` for completions to be available. pub fn invoke_completions_with_timeout(&self, ctx: &mut Reader, timeout: Duration) { let timeout = fd_readable_set::Timeout::Duration(timeout); @@ -468,32 +462,6 @@ fn dequeue_work_or_commit_to_exit(&self) -> Option { } } -/// Returns a reference to the singleton thread pool. -#[inline] -pub fn io_thread_pool() -> &'static Arc { - IO_THREAD_POOL.get().unwrap() -} - -/// Enqueues work on the IO thread pool singleton. -pub fn iothread_perform(f: impl FnOnce() + 'static + Send) { - let thread_pool = io_thread_pool(); - thread_pool.perform(f); -} - -/// Return the read fd of the singleton ThreadPool event signaller. -/// This may be used with `poll()` or `select()` to multiplex iothread completions with other events. -pub fn iothread_port() -> i32 { - io_thread_pool().event_signaller.read_fd() -} - -pub fn iothread_service_main_with_timeout(ctx: &mut Reader, timeout: Duration) { - io_thread_pool().invoke_completions_with_timeout(ctx, timeout); -} - -pub fn iothread_service_main(ctx: &mut Reader) { - io_thread_pool().invoke_completions(ctx); -} - /// `Debounce` is a simple class which executes one function on a background thread while enqueuing /// at most one more. Subsequent execution requests overwrite the enqueued one. It takes an optional /// timeout; if a handler does not finish within the timeout then a new thread is spawned to service