Migrate ThreadPool's Arc to the outside

This will simplify debouncing and allowing for multiple pools.
This commit is contained in:
Peter Ammon
2025-10-19 12:08:24 -07:00
parent 332712866c
commit 43731c88bd

View File

@@ -30,7 +30,7 @@ impl FloggableDebug for std::thread::ThreadId {}
/// The iothreads [`ThreadPool`] singleton. Used to lift I/O off of the main thread and used for
/// completions, etc.
static IO_THREAD_POOL: OnceLock<ThreadPool> = OnceLock::new();
static IO_THREAD_POOL: OnceLock<Arc<ThreadPool>> = OnceLock::new();
/// A [`ThreadPool`] work request.
type WorkItem = Box<dyn FnOnce() + 'static + Send>;
@@ -235,19 +235,12 @@ struct ThreadPoolProtected {
pub waiting_threads: usize,
}
/// Data behind an [`Arc`] to share between the [`ThreadPool`] and [`WorkerThread`] instances.
#[derive(Default)]
struct ThreadPoolShared {
pub struct ThreadPool {
/// The mutex to access shared state between [`ThreadPool`] and [`WorkerThread`] instances. This
/// is accessed both standalone and via [`cond_var`](Self::cond_var).
mutex: Mutex<ThreadPoolProtected>,
shared: Mutex<ThreadPoolProtected>,
/// The condition variable used to wake up waiting threads. This is tied to [`mutex`](Self::mutex).
cond_var: std::sync::Condvar,
}
pub struct ThreadPool {
/// The data which needs to be shared with worker threads.
shared: Arc<ThreadPoolShared>,
/// The queue of completions. This is typically executed on the "main thread".
completion_queue: Mutex<Vec<DebounceCallback>>,
/// An event signaller used for completions and queued main thread requests.
@@ -273,14 +266,15 @@ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl ThreadPool {
/// Construct a new `ThreadPool` instance with the specified min and max num of threads.
pub fn new(soft_min_threads: usize, max_threads: usize) -> Self {
ThreadPool {
pub fn new(soft_min_threads: usize, max_threads: usize) -> Arc<Self> {
Arc::new(ThreadPool {
shared: Default::default(),
cond_var: Default::default(),
completion_queue: Default::default(),
event_signaller: FdEventSignaller::new(),
soft_min_threads,
max_threads,
}
})
}
/// Enqueue a new work item onto the thread pool.
@@ -289,7 +283,11 @@ pub fn new(soft_min_threads: usize, max_threads: usize) -> Self {
/// set, the thread limit may be disregarded if extant threads are busy.
///
/// Returns the number of threads that were alive when the work item was enqueued.
pub fn perform<F: FnOnce() + 'static + Send>(&self, func: F, cant_wait: bool) -> usize {
pub fn perform<F: FnOnce() + 'static + Send>(
self: &Arc<Self>,
func: F,
cant_wait: bool,
) -> usize {
let work_item = Box::new(func);
enum ThreadAction {
None,
@@ -299,7 +297,7 @@ enum ThreadAction {
let local_thread_count;
let thread_action = {
let mut data = self.shared.mutex.lock().expect("Mutex poisoned!");
let mut data = self.shared.lock().expect("Mutex poisoned!");
local_thread_count = data.total_threads;
data.request_queue.push_back(work_item);
FLOG!(
@@ -328,7 +326,7 @@ enum ThreadAction {
ThreadAction::Wake => {
// Wake a thread if we decided to do so.
FLOG!(iothread, "notifying thread ", std::thread::current().id());
self.shared.cond_var.notify_one();
self.cond_var.notify_one();
}
ThreadAction::Spawn => {
// Spawn a thread. If this fails, it means there are already a bunch of worker
@@ -340,11 +338,7 @@ enum ThreadAction {
FLOG!(iothread, "pthread spawned");
} else {
// We failed to spawn a thread; decrement the thread count.
self.shared
.mutex
.lock()
.expect("Mutex poisoned!")
.total_threads -= 1;
self.shared.lock().expect("Mutex poisoned!").total_threads -= 1;
}
}
}
@@ -353,12 +347,12 @@ enum ThreadAction {
}
/// Attempt to spawn a new worker thread.
fn spawn_thread(&self) -> bool {
let shared = Arc::clone(&self.shared);
fn spawn_thread(self: &Arc<Self>) -> bool {
let pool = Arc::clone(self);
let soft_min_threads = self.soft_min_threads;
self::spawn(move || {
let worker = WorkerThread {
shared,
pool,
soft_min_threads,
};
@@ -388,14 +382,7 @@ pub fn invoke_completions_with_timeout(&self, ctx: &mut Reader, timeout: Duratio
/// Does nasty polling via select(); only used for testing.
#[cfg(test)]
pub(crate) fn drain_all(&self, ctx: &mut Reader) {
while self
.shared
.mutex
.lock()
.expect("Mutex poisoned!")
.total_threads
> 0
{
while self.shared.lock().expect("Mutex poisoned!").total_threads > 0 {
self.invoke_completions_with_timeout(ctx, Duration::from_millis(1000));
}
}
@@ -429,8 +416,8 @@ pub fn get(&self) -> &T {
}
pub struct WorkerThread {
/// The data shared with the [`ThreadPool`].
shared: Arc<ThreadPoolShared>,
/// The [`ThreadPool`].
pool: Arc<ThreadPool>,
/// The soft min number of threads for the associated [`ThreadPool`].
soft_min_threads: usize,
}
@@ -461,7 +448,7 @@ fn run(mut self) {
/// Dequeue a work item (perhaps waiting on the condition variable) or commit to exiting by
/// reducing the active thread count.
fn dequeue_work_or_commit_to_exit(&mut self) -> Option<WorkItem> {
let mut data = self.shared.mutex.lock().expect("Mutex poisoned!");
let mut data = self.pool.shared.lock().expect("Mutex poisoned!");
// If the queue is empty, check to see if we should wait. We should wait if our exiting
// would drop us below our soft thread count minimum.
@@ -471,7 +458,7 @@ fn dequeue_work_or_commit_to_exit(&mut self) -> Option<WorkItem> {
{
data.waiting_threads += 1;
data = self
.shared
.pool
.cond_var
.wait_timeout(data, IO_WAIT_FOR_WORK_DURATION)
.expect("Mutex poisoned!")
@@ -496,7 +483,7 @@ fn dequeue_work_or_commit_to_exit(&mut self) -> Option<WorkItem> {
/// Returns a reference to the singleton thread pool.
#[inline]
fn borrow_io_thread_pool() -> &'static ThreadPool {
fn borrow_io_thread_pool() -> &'static Arc<ThreadPool> {
IO_THREAD_POOL.get().unwrap()
}