Move MAIN_THREAD_QUEUE into ThreadPool

Continue to remove globals and improve ThreadPool testability.
This commit is contained in:
Peter Ammon
2025-10-13 15:45:49 -07:00
parent 2b9967bf01
commit b0d643c4ce

View File

@@ -43,12 +43,6 @@ unsafe impl<T> Send for ForceSend<T> {}
#[allow(clippy::type_complexity)]
type DebounceCallback = ForceSend<Box<dyn FnOnce(&mut Reader) + 'static>>;
/// The queue of [`WorkItem`]s to be executed on the main thread. This is read from in
/// `iothread_service_main()`.
///
/// Since the queue is synchronized, items don't need to implement `Send`.
static MAIN_THREAD_QUEUE: Mutex<Vec<DebounceCallback>> = Mutex::new(Vec::new());
/// Initialize some global static variables. Must be called at startup from the main thread.
pub fn init() {
MAIN_THREAD_ID
@@ -258,7 +252,12 @@ struct ThreadPoolShared {
pub struct ThreadPool {
/// The data which needs to be shared with worker threads.
shared: Arc<ThreadPoolShared>,
/// The queue of completions. This is typically executed on the "main thread".
completion_queue: Mutex<Vec<DebounceCallback>>,
/// An event signaller used for completions and queued main thread requests.
/// Note the usage order here matters:
/// 1. To enqueue a completion, first push it onto the queue, then post the event signaller.
/// 2. To service completions, first consume the event signaller, then process the queue.
event_signaller: FdEventSignaller,
/// The minimum number of threads that will be kept waiting even when idle in the pool.
soft_min_threads: usize,
@@ -281,6 +280,7 @@ impl ThreadPool {
pub fn new(soft_min_threads: usize, max_threads: usize) -> Self {
ThreadPool {
shared: Default::default(),
completion_queue: Default::default(),
event_signaller: FdEventSignaller::new(),
soft_min_threads,
max_threads,
@@ -509,7 +509,7 @@ pub fn iothread_service_main(ctx: &mut Reader) {
let pool = borrow_io_thread_pool();
pool.event_signaller.try_consume();
let queue = std::mem::take(&mut *MAIN_THREAD_QUEUE.lock().expect("Mutex poisoned!"));
let queue = std::mem::take(&mut *pool.completion_queue.lock().expect("Mutex poisoned!"));
// Perform each completion in order.
for callback in queue {
@@ -630,8 +630,9 @@ pub fn perform_with_completion<H, R, C>(&self, handler: H, completion: C) -> Non
let completion = completion_wrapper;
(completion.0)(ctx, result);
}));
MAIN_THREAD_QUEUE.lock().unwrap().push(callback);
borrow_io_thread_pool().event_signaller.post();
let pool = borrow_io_thread_pool();
pool.completion_queue.lock().unwrap().push(callback);
pool.event_signaller.post();
});
self.perform_inner(work_item)
}