diff --git a/src/threads.rs b/src/threads.rs index f037f76c0..6a990b7f7 100644 --- a/src/threads.rs +++ b/src/threads.rs @@ -30,7 +30,7 @@ impl FloggableDebug for std::thread::ThreadId {} /// The iothreads [`ThreadPool`] singleton. Used to lift I/O off of the main thread and used for /// completions, etc. -static IO_THREAD_POOL: OnceLock = OnceLock::new(); +static IO_THREAD_POOL: OnceLock> = OnceLock::new(); /// A [`ThreadPool`] work request. type WorkItem = Box; @@ -235,19 +235,12 @@ struct ThreadPoolProtected { pub waiting_threads: usize, } -/// Data behind an [`Arc`] to share between the [`ThreadPool`] and [`WorkerThread`] instances. -#[derive(Default)] -struct ThreadPoolShared { +pub struct ThreadPool { /// The mutex to access shared state between [`ThreadPool`] and [`WorkerThread`] instances. This /// is accessed both standalone and via [`cond_var`](Self::cond_var). - mutex: Mutex, + shared: Mutex, /// The condition variable used to wake up waiting threads. This is tied to [`mutex`](Self::mutex). cond_var: std::sync::Condvar, -} - -pub struct ThreadPool { - /// The data which needs to be shared with worker threads. - shared: Arc, /// The queue of completions. This is typically executed on the "main thread". completion_queue: Mutex>, /// An event signaller used for completions and queued main thread requests. @@ -273,14 +266,15 @@ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { impl ThreadPool { /// Construct a new `ThreadPool` instance with the specified min and max num of threads. - pub fn new(soft_min_threads: usize, max_threads: usize) -> Self { - ThreadPool { + pub fn new(soft_min_threads: usize, max_threads: usize) -> Arc { + Arc::new(ThreadPool { shared: Default::default(), + cond_var: Default::default(), completion_queue: Default::default(), event_signaller: FdEventSignaller::new(), soft_min_threads, max_threads, - } + }) } /// Enqueue a new work item onto the thread pool. @@ -289,7 +283,11 @@ pub fn new(soft_min_threads: usize, max_threads: usize) -> Self { /// set, the thread limit may be disregarded if extant threads are busy. /// /// Returns the number of threads that were alive when the work item was enqueued. - pub fn perform(&self, func: F, cant_wait: bool) -> usize { + pub fn perform( + self: &Arc, + func: F, + cant_wait: bool, + ) -> usize { let work_item = Box::new(func); enum ThreadAction { None, @@ -299,7 +297,7 @@ enum ThreadAction { let local_thread_count; let thread_action = { - let mut data = self.shared.mutex.lock().expect("Mutex poisoned!"); + let mut data = self.shared.lock().expect("Mutex poisoned!"); local_thread_count = data.total_threads; data.request_queue.push_back(work_item); FLOG!( @@ -328,7 +326,7 @@ enum ThreadAction { ThreadAction::Wake => { // Wake a thread if we decided to do so. FLOG!(iothread, "notifying thread ", std::thread::current().id()); - self.shared.cond_var.notify_one(); + self.cond_var.notify_one(); } ThreadAction::Spawn => { // Spawn a thread. If this fails, it means there are already a bunch of worker @@ -340,11 +338,7 @@ enum ThreadAction { FLOG!(iothread, "pthread spawned"); } else { // We failed to spawn a thread; decrement the thread count. - self.shared - .mutex - .lock() - .expect("Mutex poisoned!") - .total_threads -= 1; + self.shared.lock().expect("Mutex poisoned!").total_threads -= 1; } } } @@ -353,12 +347,12 @@ enum ThreadAction { } /// Attempt to spawn a new worker thread. - fn spawn_thread(&self) -> bool { - let shared = Arc::clone(&self.shared); + fn spawn_thread(self: &Arc) -> bool { + let pool = Arc::clone(self); let soft_min_threads = self.soft_min_threads; self::spawn(move || { let worker = WorkerThread { - shared, + pool, soft_min_threads, }; @@ -388,14 +382,7 @@ pub fn invoke_completions_with_timeout(&self, ctx: &mut Reader, timeout: Duratio /// Does nasty polling via select(); only used for testing. #[cfg(test)] pub(crate) fn drain_all(&self, ctx: &mut Reader) { - while self - .shared - .mutex - .lock() - .expect("Mutex poisoned!") - .total_threads - > 0 - { + while self.shared.lock().expect("Mutex poisoned!").total_threads > 0 { self.invoke_completions_with_timeout(ctx, Duration::from_millis(1000)); } } @@ -429,8 +416,8 @@ pub fn get(&self) -> &T { } pub struct WorkerThread { - /// The data shared with the [`ThreadPool`]. - shared: Arc, + /// The [`ThreadPool`]. + pool: Arc, /// The soft min number of threads for the associated [`ThreadPool`]. soft_min_threads: usize, } @@ -461,7 +448,7 @@ fn run(mut self) { /// Dequeue a work item (perhaps waiting on the condition variable) or commit to exiting by /// reducing the active thread count. fn dequeue_work_or_commit_to_exit(&mut self) -> Option { - let mut data = self.shared.mutex.lock().expect("Mutex poisoned!"); + let mut data = self.pool.shared.lock().expect("Mutex poisoned!"); // If the queue is empty, check to see if we should wait. We should wait if our exiting // would drop us below our soft thread count minimum. @@ -471,7 +458,7 @@ fn dequeue_work_or_commit_to_exit(&mut self) -> Option { { data.waiting_threads += 1; data = self - .shared + .pool .cond_var .wait_timeout(data, IO_WAIT_FOR_WORK_DURATION) .expect("Mutex poisoned!") @@ -496,7 +483,7 @@ fn dequeue_work_or_commit_to_exit(&mut self) -> Option { /// Returns a reference to the singleton thread pool. #[inline] -fn borrow_io_thread_pool() -> &'static ThreadPool { +fn borrow_io_thread_pool() -> &'static Arc { IO_THREAD_POOL.get().unwrap() }