Move some iothread functions into ThreadPool

Continue to get away from singletons.
This commit is contained in:
Peter Ammon
2025-10-13 16:31:57 -07:00
parent b0d643c4ce
commit 332712866c

View File

@@ -2,6 +2,7 @@
//! ported directly from the cpp code so we can use rust threads instead of using pthreads.
use crate::fd_monitor::FdEventSignaller;
use crate::fd_readable_set;
use crate::flog::{FLOG, FloggableDebug};
use crate::reader::Reader;
use std::marker::PhantomData;
@@ -153,11 +154,7 @@ fn panic_is_forked_child() {
}
}
/// The rusty version of `iothreads::make_detached_pthread()`. We will probably need a
/// `spawn_scoped` version of the same to handle some more advanced borrow cases safely, and maybe
/// an unsafe version that doesn't do any lifetime checking akin to
/// `spawn_unchecked()`[std::thread::Builder::spawn_unchecked], which is a nightly-only feature.
///
/// Spawn a new thread to run the given callback.
/// Returns a boolean indicating whether or not the thread was successfully launched. Failure here
/// is not dependent on the passed callback and implies a system error (likely insufficient
/// resources).
@@ -188,7 +185,6 @@ pub fn spawn<F: FnOnce() + Send + 'static>(callback: F) -> bool {
// extant requests. So we can ignore failure with some confidence.
// We don't have to port the PTHREAD_CREATE_DETACHED logic. Rust threads are detached
// automatically if the returned join handle is dropped.
let result = match std::thread::Builder::new().spawn(callback) {
Ok(handle) => {
let thread_id = thread_id();
@@ -295,10 +291,6 @@ pub fn new(soft_min_threads: usize, max_threads: usize) -> Self {
/// Returns the number of threads that were alive when the work item was enqueued.
pub fn perform<F: FnOnce() + 'static + Send>(&self, func: F, cant_wait: bool) -> usize {
let work_item = Box::new(func);
self.perform_inner(work_item, cant_wait)
}
fn perform_inner(&self, f: WorkItem, cant_wait: bool) -> usize {
enum ThreadAction {
None,
Wake,
@@ -309,7 +301,7 @@ enum ThreadAction {
let thread_action = {
let mut data = self.shared.mutex.lock().expect("Mutex poisoned!");
local_thread_count = data.total_threads;
data.request_queue.push_back(f);
data.request_queue.push_back(work_item);
FLOG!(
iothread,
"enqueuing work item (count is ",
@@ -373,6 +365,40 @@ fn spawn_thread(&self) -> bool {
worker.run();
})
}
/// Invoke completions immediately.
pub fn invoke_completions(&self, ctx: &mut Reader) {
// Note the order here: consume the event signaller and then process events.
self.event_signaller.try_consume();
let queue = std::mem::take(&mut *self.completion_queue.lock().expect("Mutex poisoned!"));
for callback in queue {
(callback.0)(ctx);
}
}
/// Invoke completions, waiting up to `timeout` for completions to be available.
pub fn invoke_completions_with_timeout(&self, ctx: &mut Reader, timeout: Duration) {
let timeout = fd_readable_set::Timeout::Duration(timeout);
if fd_readable_set::is_fd_readable(self.event_signaller.read_fd(), timeout) {
self.invoke_completions(ctx);
}
}
/// Drain all threads.
/// Does nasty polling via select(); only used for testing.
#[cfg(test)]
pub(crate) fn drain_all(&self, ctx: &mut Reader) {
while self
.shared
.mutex
.lock()
.expect("Mutex poisoned!")
.total_threads
> 0
{
self.invoke_completions_with_timeout(ctx, Duration::from_millis(1000));
}
}
}
/// A `Sync` and `Send` wrapper for non-`Sync`/`Send` types.
@@ -464,7 +490,7 @@ fn dequeue_work_or_commit_to_exit(&mut self) -> Option<WorkItem> {
data.total_threads -= 1;
}
return result;
result
}
}
@@ -485,51 +511,27 @@ pub fn iothread_perform(f: impl FnOnce() + 'static + Send) {
/// It does its best to spawn a thread if all other threads are occupied. This is primarily for
/// cases where deferring creation of a new thread might lead to a deadlock.
pub fn iothread_perform_cant_wait(f: impl FnOnce() + 'static + Send) {
let thread_pool = borrow_io_thread_pool();
thread_pool.perform(f, true);
borrow_io_thread_pool().perform(f, true);
}
/// Return the read fd of the singleton ThreadPool event signaller.
/// This may be used with `poll()` or `select()` to multiplex iothread completions with other events.
pub fn iothread_port() -> i32 {
borrow_io_thread_pool().event_signaller.read_fd()
}
pub fn iothread_service_main_with_timeout(ctx: &mut Reader, timeout: Duration) {
use crate::fd_readable_set::Timeout;
if crate::fd_readable_set::is_fd_readable(iothread_port(), Timeout::Duration(timeout)) {
iothread_service_main(ctx);
}
borrow_io_thread_pool().invoke_completions_with_timeout(ctx, timeout);
}
pub fn iothread_service_main(ctx: &mut Reader) {
self::assert_is_main_thread();
// Note: the order here is important. We must consume events before handling requests, as
// posting uses the opposite order.
let pool = borrow_io_thread_pool();
pool.event_signaller.try_consume();
let queue = std::mem::take(&mut *pool.completion_queue.lock().expect("Mutex poisoned!"));
// Perform each completion in order.
for callback in queue {
(callback.0)(ctx);
}
borrow_io_thread_pool().invoke_completions(ctx);
}
/// Does nasty polling via select(), only used for testing.
#[cfg(test)]
pub(crate) fn iothread_drain_all(ctx: &mut Reader) {
while borrow_io_thread_pool()
.shared
.mutex
.lock()
.expect("Mutex poisoned!")
.total_threads
> 0
{
iothread_service_main_with_timeout(ctx, Duration::from_millis(1000));
}
borrow_io_thread_pool().drain_all(ctx);
}
/// `Debounce` is a simple class which executes one function on a background thread while enqueuing