mirror of
https://github.com/fish-shell/fish-shell.git
synced 2026-05-28 01:11:15 -03:00
Merge branch 'cleanup-thread-pool'
This merges changes that make thread pools instanced. We no longer have a single global thread pool. This results in significant simplifications especially in the reader (no more "canary").
This commit is contained in:
@@ -809,19 +809,28 @@ mod tests {
|
||||
use crate::common::wcs2osstring;
|
||||
use crate::env::{EnvVar, EnvVarFlags, VarTable};
|
||||
use crate::env_universal_common::{EnvUniversal, UvarFormat};
|
||||
use crate::reader::fake_scoped_reader;
|
||||
use crate::tests::prelude::*;
|
||||
use crate::threads::{iothread_drain_all, iothread_perform};
|
||||
use crate::wchar::prelude::*;
|
||||
use crate::wutil::{INVALID_FILE_ID, file_id_for_path};
|
||||
|
||||
const UVARS_PER_THREAD: usize = 8;
|
||||
const UVARS_TEST_PATH: &wstr = L!("test/fish_uvars_test/varsfile.txt");
|
||||
|
||||
fn test_universal_helper(x: usize) {
|
||||
/// Creates a unique temporary directory and file path for universal variable tests.
|
||||
/// Returns (directory_path, file_path).
|
||||
fn make_test_uvar_path(test_name: &str) -> (std::path::PathBuf, WString) {
|
||||
let test_dir = std::env::temp_dir().join(format!(
|
||||
"fish_test_{}_{:?}",
|
||||
test_name,
|
||||
std::thread::current().id()
|
||||
));
|
||||
let test_path = sprintf!("%s/varsfile.txt", test_dir.to_string_lossy());
|
||||
(test_dir, test_path)
|
||||
}
|
||||
|
||||
fn test_universal_helper(x: usize, path: &wstr) {
|
||||
let _cleanup = test_init();
|
||||
let mut uvars = EnvUniversal::new();
|
||||
uvars.initialize_at_path(UVARS_TEST_PATH.to_owned());
|
||||
uvars.initialize_at_path(path.to_owned());
|
||||
|
||||
for j in 0..UVARS_PER_THREAD {
|
||||
let key = sprintf!("key_%d_%d", x, j);
|
||||
@@ -841,23 +850,28 @@ fn test_universal_helper(x: usize) {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_universal() {
|
||||
let _cleanup = test_init();
|
||||
let _ = std::fs::remove_dir_all("test/fish_uvars_test/");
|
||||
std::fs::create_dir_all("test/fish_uvars_test/").unwrap();
|
||||
let parser = TestParser::new();
|
||||
|
||||
let mut reader = fake_scoped_reader(&parser);
|
||||
let (test_dir, test_path) = make_test_uvar_path("universal");
|
||||
let _ = std::fs::remove_dir_all(&test_dir);
|
||||
std::fs::create_dir_all(&test_dir).unwrap();
|
||||
|
||||
let threads = 1;
|
||||
let mut handles = Vec::new();
|
||||
|
||||
for i in 0..threads {
|
||||
iothread_perform(move || test_universal_helper(i));
|
||||
let path = test_path.to_owned();
|
||||
handles.push(std::thread::spawn(move || {
|
||||
test_universal_helper(i, &path);
|
||||
}));
|
||||
}
|
||||
|
||||
for h in handles {
|
||||
h.join().unwrap();
|
||||
}
|
||||
iothread_drain_all(&mut reader);
|
||||
|
||||
let mut uvars = EnvUniversal::new();
|
||||
uvars.initialize_at_path(UVARS_TEST_PATH.to_owned());
|
||||
uvars.initialize_at_path(test_path.to_owned());
|
||||
|
||||
for i in 0..threads {
|
||||
for j in 0..UVARS_PER_THREAD {
|
||||
@@ -875,7 +889,7 @@ fn test_universal() {
|
||||
}
|
||||
}
|
||||
|
||||
std::fs::remove_dir_all("test/fish_uvars_test/").unwrap();
|
||||
std::fs::remove_dir_all(&test_dir).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1017,18 +1031,18 @@ fn test_universal_parsing_legacy() {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_universal_callbacks() {
|
||||
let _cleanup = test_init();
|
||||
std::fs::create_dir_all("test/fish_uvars_test/").unwrap();
|
||||
let (test_dir, test_path) = make_test_uvar_path("callbacks");
|
||||
std::fs::create_dir_all(&test_dir).unwrap();
|
||||
let mut uvars1 = EnvUniversal::new();
|
||||
let mut uvars2 = EnvUniversal::new();
|
||||
let mut callbacks = uvars1
|
||||
.initialize_at_path(UVARS_TEST_PATH.to_owned())
|
||||
.initialize_at_path(test_path.to_owned())
|
||||
.unwrap_or_default();
|
||||
callbacks.append(
|
||||
&mut uvars2
|
||||
.initialize_at_path(UVARS_TEST_PATH.to_owned())
|
||||
.initialize_at_path(test_path.to_owned())
|
||||
.unwrap_or_default(),
|
||||
);
|
||||
|
||||
@@ -1084,7 +1098,7 @@ macro_rules! sync {
|
||||
assert_eq!(callbacks[1].val.as_ref().unwrap().as_string(), L!("1"));
|
||||
assert_eq!(callbacks[2].key, L!("delta"));
|
||||
assert_eq!(callbacks[2].val, None);
|
||||
std::fs::remove_dir_all("test/fish_uvars_test/").unwrap();
|
||||
std::fs::remove_dir_all(&test_dir).unwrap();
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -1110,23 +1124,20 @@ macro_rules! validate {
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_universal_ok_to_save() {
|
||||
let _cleanup = test_init();
|
||||
// Ensure we don't try to save after reading from a newer fish.
|
||||
std::fs::create_dir_all("test/fish_uvars_test/").unwrap();
|
||||
let (test_dir, test_path) = make_test_uvar_path("ok_to_save");
|
||||
std::fs::create_dir_all(&test_dir).unwrap();
|
||||
let contents = b"# VERSION: 99999.99\n";
|
||||
std::fs::write(wcs2osstring(UVARS_TEST_PATH), contents).unwrap();
|
||||
std::fs::write(wcs2osstring(&test_path), contents).unwrap();
|
||||
|
||||
let before_id = file_id_for_path(UVARS_TEST_PATH);
|
||||
assert_ne!(
|
||||
before_id, INVALID_FILE_ID,
|
||||
"UVARS_TEST_PATH should be readable"
|
||||
);
|
||||
let before_id = file_id_for_path(&test_path);
|
||||
assert_ne!(before_id, INVALID_FILE_ID, "test_path should be readable");
|
||||
|
||||
let mut uvars = EnvUniversal::new();
|
||||
uvars
|
||||
.initialize_at_path(UVARS_TEST_PATH.to_owned())
|
||||
.initialize_at_path(test_path.to_owned())
|
||||
.unwrap_or_default();
|
||||
assert!(!uvars.is_ok_to_save(), "Should not be OK to save");
|
||||
uvars.sync();
|
||||
@@ -1137,11 +1148,8 @@ fn test_universal_ok_to_save() {
|
||||
);
|
||||
|
||||
// Ensure file is same.
|
||||
let after_id = file_id_for_path(UVARS_TEST_PATH);
|
||||
assert_eq!(
|
||||
before_id, after_id,
|
||||
"UVARS_TEST_PATH should not have changed",
|
||||
);
|
||||
std::fs::remove_dir_all("test/fish_uvars_test/").unwrap();
|
||||
let after_id = file_id_for_path(&test_path);
|
||||
assert_eq!(before_id, after_id, "test_path should not have changed",);
|
||||
std::fs::remove_dir_all(&test_dir).unwrap();
|
||||
}
|
||||
}
|
||||
|
||||
20
src/exec.rs
20
src/exec.rs
@@ -41,7 +41,7 @@
|
||||
};
|
||||
use crate::reader::{reader_run_count, safe_restore_term_mode};
|
||||
use crate::redirection::{Dup2List, dup2_list_resolve_chain};
|
||||
use crate::threads::{iothread_perform_cant_wait, is_forked_child};
|
||||
use crate::threads::{ThreadPool, is_forked_child};
|
||||
use crate::trace::trace_if_enabled_with_args;
|
||||
use crate::tty_handoff::TtyHandoff;
|
||||
use crate::wchar::prelude::*;
|
||||
@@ -60,8 +60,20 @@
|
||||
use std::num::NonZeroU32;
|
||||
use std::os::fd::{AsRawFd, OwnedFd, RawFd};
|
||||
use std::slice;
|
||||
use std::sync::atomic::Ordering;
|
||||
use std::sync::{Arc, atomic::AtomicUsize};
|
||||
use std::sync::{
|
||||
Arc, OnceLock,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
/// The singleton shared exec thread pool.
|
||||
/// This is used to write the output of internal processes (e.g. builtins)
|
||||
/// to their target fds.
|
||||
/// TODO: this IO could be multiplexed using FdMonitor.
|
||||
fn exec_thread_pool() -> &'static Arc<ThreadPool> {
|
||||
static EXEC_THREAD_POOL: OnceLock<Arc<ThreadPool>> = OnceLock::new();
|
||||
// Use an unbounded queue because otherwise we risk deadlock.
|
||||
EXEC_THREAD_POOL.get_or_init(|| ThreadPool::new(1, usize::MAX))
|
||||
}
|
||||
|
||||
/// Execute the processes specified by `j` in the parser \p.
|
||||
/// On a true return, the job was successfully launched and the parser will take responsibility for
|
||||
@@ -601,7 +613,7 @@ fn skip_err(&self) -> bool {
|
||||
// builtin_run provide this directly, rather than setting it in the process.
|
||||
f.success_status = p.status();
|
||||
|
||||
iothread_perform_cant_wait(move || {
|
||||
exec_thread_pool().perform(move || {
|
||||
let mut status = f.success_status;
|
||||
if !f.skip_out() {
|
||||
if let Err(err) = write_loop(&f.src_outfd, &f.outdata) {
|
||||
|
||||
@@ -21,6 +21,7 @@
|
||||
LOCKED_FILE_MODE, LockedFile, LockingMode, PotentialUpdate, WriteMethod, lock_and_load,
|
||||
rewrite_via_temporary_file,
|
||||
},
|
||||
threads::ThreadPool,
|
||||
wcstringutil::trim,
|
||||
};
|
||||
use std::{
|
||||
@@ -55,7 +56,7 @@
|
||||
parse_constants::{ParseTreeFlags, StatementDecoration},
|
||||
parse_util::{parse_util_detect_errors, parse_util_unescape_wildcards},
|
||||
path::{path_get_config, path_get_data, path_is_valid},
|
||||
threads::{assert_is_background_thread, iothread_perform},
|
||||
threads::assert_is_background_thread,
|
||||
util::{find_subslice, get_rng},
|
||||
wchar::prelude::*,
|
||||
wcstringutil::subsequence_in_string,
|
||||
@@ -357,6 +358,8 @@ struct HistoryImpl {
|
||||
loaded_old: bool, // false
|
||||
/// List of old items, as offsets into out mmap data.
|
||||
old_item_offsets: Vec<usize>,
|
||||
/// Thread pool for background operations.
|
||||
thread_pool: Arc<ThreadPool>,
|
||||
}
|
||||
|
||||
impl HistoryImpl {
|
||||
@@ -809,6 +812,8 @@ fn new(name: WString) -> Self {
|
||||
countdown_to_vacuum: None,
|
||||
loaded_old: false,
|
||||
old_item_offsets: Vec::new(),
|
||||
// Up to 8 threads, no soft min.
|
||||
thread_pool: ThreadPool::new(0, 8),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1361,9 +1366,10 @@ pub fn add_pending_with_file_detection(
|
||||
// and unblock the item.
|
||||
// Don't hold the lock while we perform this file detection.
|
||||
imp.add(item, /*pending=*/ true, to_disk);
|
||||
let thread_pool = Arc::clone(&imp.thread_pool);
|
||||
drop(imp);
|
||||
let vars_snapshot = vars.snapshot();
|
||||
iothread_perform(move || {
|
||||
thread_pool.perform(move || {
|
||||
// Don't hold the lock while we perform this file detection.
|
||||
let validated_paths = expand_and_detect_paths(potential_paths, &vars_snapshot);
|
||||
let mut imp = self.imp();
|
||||
|
||||
75
src/input.rs
75
src/input.rs
@@ -1,19 +1,14 @@
|
||||
use crate::common::{Named, bytes2wcstring, escape, get_by_sorted_name};
|
||||
use crate::common::{Named, escape, get_by_sorted_name};
|
||||
use crate::env::Environment;
|
||||
use crate::event;
|
||||
use crate::flog::FLOG;
|
||||
use crate::global_safety::RelaxedAtomicBool;
|
||||
use crate::input_common::{
|
||||
CharEvent, CharInputStyle, ImplicitEvent, InputData, InputEventQueuer, KeyMatchQuality,
|
||||
CharEvent, CharInputStyle, ImplicitEvent, InputEventQueuer, KeyMatchQuality,
|
||||
R_END_INPUT_FUNCTIONS, ReadlineCmd, match_key_event_to_key,
|
||||
};
|
||||
use crate::key::{self, Key, Modifiers, canonicalize_raw_escapes, ctrl};
|
||||
use crate::proc::job_reap;
|
||||
use crate::reader::{
|
||||
Reader, reader_reading_interrupted, reader_reset_interrupted, reader_schedule_prompt_repaint,
|
||||
};
|
||||
use crate::signal::signal_clear_cancel;
|
||||
use crate::threads::{assert_is_main_thread, iothread_service_main};
|
||||
use crate::reader::{Reader, reader_reset_interrupted};
|
||||
use crate::threads::assert_is_main_thread;
|
||||
use crate::wchar::prelude::*;
|
||||
use once_cell::sync::Lazy;
|
||||
use std::mem;
|
||||
@@ -377,68 +372,6 @@ pub fn init_input() {
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> InputEventQueuer for Reader<'a> {
|
||||
fn get_input_data(&self) -> &InputData {
|
||||
&self.data.input_data
|
||||
}
|
||||
|
||||
fn get_input_data_mut(&mut self) -> &mut InputData {
|
||||
&mut self.data.input_data
|
||||
}
|
||||
|
||||
fn prepare_to_select(&mut self) {
|
||||
// Fire any pending events and reap stray processes, including printing exit status messages.
|
||||
event::fire_delayed(self.parser);
|
||||
if job_reap(self.parser, true) {
|
||||
reader_schedule_prompt_repaint();
|
||||
}
|
||||
}
|
||||
|
||||
fn select_interrupted(&mut self) {
|
||||
// Readline commands may be bound to \cc which also sets the cancel flag.
|
||||
// See #6937, #8125.
|
||||
signal_clear_cancel();
|
||||
|
||||
// Fire any pending events and reap stray processes, including printing exit status messages.
|
||||
let parser = self.parser;
|
||||
event::fire_delayed(parser);
|
||||
if job_reap(parser, true) {
|
||||
reader_schedule_prompt_repaint();
|
||||
}
|
||||
|
||||
// Tell the reader an event occurred.
|
||||
if reader_reading_interrupted(self) != 0 {
|
||||
self.enqueue_interrupt_key();
|
||||
return;
|
||||
}
|
||||
self.push_front(CharEvent::from_check_exit());
|
||||
}
|
||||
|
||||
fn uvar_change_notified(&mut self) {
|
||||
self.parser.sync_uvars_and_fire(true /* always */);
|
||||
}
|
||||
|
||||
fn ioport_notified(&mut self) {
|
||||
iothread_service_main(self);
|
||||
}
|
||||
|
||||
fn paste_start_buffering(&mut self) {
|
||||
self.input_data.paste_buffer = Some(vec![]);
|
||||
self.push_front(CharEvent::from_readline(ReadlineCmd::BeginUndoGroup));
|
||||
}
|
||||
|
||||
fn paste_commit(&mut self) {
|
||||
self.push_front(CharEvent::from_readline(ReadlineCmd::EndUndoGroup));
|
||||
let Some(buffer) = self.input_data.paste_buffer.take() else {
|
||||
return;
|
||||
};
|
||||
self.push_front(CharEvent::Command(sprintf!(
|
||||
"__fish_paste %s",
|
||||
escape(&bytes2wcstring(&buffer))
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
/// A struct which allows accumulating input events, or returns them to the queue.
|
||||
/// This contains a list of events which have been dequeued, and a current index into that list.
|
||||
pub struct EventQueuePeeker<'q, Queuer: InputEventQueuer + ?Sized> {
|
||||
|
||||
@@ -10,7 +10,6 @@
|
||||
canonicalize_keyed_control_char, char_to_symbol, function_key, shift,
|
||||
};
|
||||
use crate::reader::reader_test_and_clear_interrupted;
|
||||
use crate::threads::iothread_port;
|
||||
use crate::tty_handoff::{
|
||||
SCROLL_CONTENT_UP_TERMINFO_CODE, XTVERSION, maybe_set_kitty_keyboard_capability,
|
||||
maybe_set_scroll_content_up_capability,
|
||||
@@ -477,14 +476,13 @@ fn readb(in_fd: RawFd) -> Option<u8> {
|
||||
Some(c)
|
||||
}
|
||||
|
||||
fn next_input_event(in_fd: RawFd, timeout: Timeout) -> InputEventTrigger {
|
||||
fn next_input_event(in_fd: RawFd, ioport_fd: RawFd, timeout: Timeout) -> InputEventTrigger {
|
||||
let mut fdset = FdReadableSet::new();
|
||||
loop {
|
||||
fdset.clear();
|
||||
fdset.add(in_fd);
|
||||
|
||||
// Add the completion ioport.
|
||||
let ioport_fd = iothread_port();
|
||||
// Add the completion ioport (possibly -1 - a no-op).
|
||||
fdset.add(ioport_fd);
|
||||
|
||||
// Get the uvar notifier fd (possibly none).
|
||||
@@ -761,6 +759,7 @@ fn readch(&mut self) -> CharEvent {
|
||||
|
||||
match next_input_event(
|
||||
self.get_in_fd(),
|
||||
self.get_ioport_fd(),
|
||||
if self.is_blocked_querying() {
|
||||
Timeout::Duration(self.get_input_data().blocking_query_timeout.unwrap())
|
||||
} else {
|
||||
@@ -809,7 +808,11 @@ fn readch(&mut self) -> CharEvent {
|
||||
match decode_one_codepoint_utf8(&mut seq, InvalidPolicy::Error, &buffer) {
|
||||
DecodeState::Incomplete => {
|
||||
buffer.push(
|
||||
match next_input_event(self.get_in_fd(), Timeout::Forever) {
|
||||
match next_input_event(
|
||||
self.get_in_fd(),
|
||||
self.get_ioport_fd(),
|
||||
Timeout::Forever,
|
||||
) {
|
||||
InputEventTrigger::Byte(b) => b,
|
||||
_ => 0,
|
||||
},
|
||||
@@ -1425,6 +1428,11 @@ fn get_in_fd(&self) -> RawFd {
|
||||
self.get_input_data().in_fd
|
||||
}
|
||||
|
||||
/// Return the fd of the IO port, or -1 if none.
|
||||
fn get_ioport_fd(&self) -> RawFd {
|
||||
-1
|
||||
}
|
||||
|
||||
/// Return the input data. This is to be implemented by the concrete type.
|
||||
fn get_input_data(&self) -> &InputData;
|
||||
fn get_input_data_mut(&mut self) -> &mut InputData;
|
||||
|
||||
@@ -57,7 +57,6 @@
|
||||
pub mod proc;
|
||||
pub mod re;
|
||||
pub mod reader;
|
||||
pub mod reader_history_search;
|
||||
pub mod redirection;
|
||||
pub mod screen;
|
||||
pub mod signal;
|
||||
|
||||
@@ -1423,16 +1423,13 @@ mod tests {
|
||||
};
|
||||
use crate::parse_tree::{LineCounter, parse_source};
|
||||
use crate::parse_util::{parse_util_detect_errors, parse_util_detect_errors_in_argument};
|
||||
use crate::parser::BlockType;
|
||||
use crate::reader::{fake_scoped_reader, reader_reset_interrupted};
|
||||
use crate::signal::{signal_clear_cancel, signal_reset_handlers, signal_set_handlers};
|
||||
use crate::tests::prelude::*;
|
||||
use crate::threads::iothread_perform;
|
||||
use crate::wchar::prelude::*;
|
||||
use crate::wcstringutil::join_strings;
|
||||
use libc::SIGINT;
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_parser() {
|
||||
@@ -2056,15 +2053,12 @@ fn test_eval_recursion_detection() {
|
||||
&IoChain::new(),
|
||||
);
|
||||
|
||||
parser.eval_with(
|
||||
parser.eval(
|
||||
L!(concat!(
|
||||
"function recursive1 ; recursive2 ; end ; ",
|
||||
"function recursive2 ; recursive1 ; end ; recursive1; ",
|
||||
)),
|
||||
&IoChain::new(),
|
||||
None,
|
||||
BlockType::top,
|
||||
/*test_only_suppress_stderr=*/ true,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2075,13 +2069,7 @@ fn test_eval_illegal_exit_code() {
|
||||
let parser = TestParser::new();
|
||||
macro_rules! validate {
|
||||
($cmd:expr, $result:expr) => {
|
||||
parser.eval_with(
|
||||
$cmd,
|
||||
&IoChain::new(),
|
||||
None,
|
||||
BlockType::top,
|
||||
/*test_only_suppress_stderr=*/ true,
|
||||
);
|
||||
parser.eval($cmd, &IoChain::new());
|
||||
let exit_status = parser.get_last_status();
|
||||
assert_eq!(exit_status, parser.get_last_status());
|
||||
};
|
||||
@@ -2105,12 +2093,9 @@ macro_rules! validate {
|
||||
fn test_eval_empty_function_name() {
|
||||
let _cleanup = test_init();
|
||||
let parser = TestParser::new();
|
||||
parser.eval_with(
|
||||
parser.eval(
|
||||
L!("function '' ; echo fail; exit 42 ; end ; ''"),
|
||||
&IoChain::new(),
|
||||
None,
|
||||
BlockType::top,
|
||||
/*test_only_suppress_stderr=*/ true,
|
||||
);
|
||||
}
|
||||
|
||||
@@ -2135,7 +2120,7 @@ fn test_1_cancellation(parser: &Parser, src: &wstr) {
|
||||
let delay = Duration::from_millis(100);
|
||||
#[allow(clippy::unnecessary_cast)]
|
||||
let thread = unsafe { libc::pthread_self() } as usize;
|
||||
iothread_perform(move || {
|
||||
std::thread::spawn(move || {
|
||||
// Wait a while and then SIGINT the main thread.
|
||||
std::thread::sleep(delay);
|
||||
unsafe {
|
||||
|
||||
@@ -75,6 +75,8 @@ pub fn active(&self) -> bool {
|
||||
pub fn by_token(&self) -> bool {
|
||||
matches!(self.mode, SearchMode::Token | SearchMode::LastToken)
|
||||
}
|
||||
// Included for completeness.
|
||||
#[allow(dead_code)]
|
||||
pub fn by_line(&self) -> bool {
|
||||
self.mode == SearchMode::Line
|
||||
}
|
||||
78
src/reader/input.rs
Normal file
78
src/reader/input.rs
Normal file
@@ -0,0 +1,78 @@
|
||||
//! Reader implementation of InputEventQueuer.
|
||||
use std::os::fd::RawFd;
|
||||
|
||||
use crate::common::{bytes2wcstring, escape};
|
||||
use crate::event;
|
||||
use crate::input_common::{CharEvent, InputData, InputEventQueuer, ReadlineCmd};
|
||||
use crate::proc::job_reap;
|
||||
use crate::signal::signal_clear_cancel;
|
||||
|
||||
use super::{Reader, reader_reading_interrupted, reader_schedule_prompt_repaint};
|
||||
|
||||
impl<'a> InputEventQueuer for Reader<'a> {
|
||||
fn get_input_data(&self) -> &InputData {
|
||||
&self.input_data
|
||||
}
|
||||
|
||||
fn get_input_data_mut(&mut self) -> &mut InputData {
|
||||
&mut self.input_data
|
||||
}
|
||||
|
||||
fn get_ioport_fd(&self) -> RawFd {
|
||||
self.debouncers.event_signaller_read_fd()
|
||||
}
|
||||
|
||||
fn prepare_to_select(&mut self) {
|
||||
// Fire any pending events and reap stray processes, including printing exit status messages.
|
||||
event::fire_delayed(self.parser);
|
||||
if job_reap(self.parser, true) {
|
||||
reader_schedule_prompt_repaint();
|
||||
}
|
||||
}
|
||||
|
||||
fn select_interrupted(&mut self) {
|
||||
// Readline commands may be bound to \cc which also sets the cancel flag.
|
||||
// See #6937, #8125.
|
||||
signal_clear_cancel();
|
||||
|
||||
// Fire any pending events and reap stray processes, including printing exit status messages.
|
||||
let parser = self.parser;
|
||||
event::fire_delayed(parser);
|
||||
if job_reap(parser, true) {
|
||||
reader_schedule_prompt_repaint();
|
||||
}
|
||||
|
||||
// Tell the reader an event occurred.
|
||||
if reader_reading_interrupted(self) != 0 {
|
||||
self.enqueue_interrupt_key();
|
||||
return;
|
||||
}
|
||||
self.push_front(CharEvent::from_check_exit());
|
||||
}
|
||||
|
||||
fn uvar_change_notified(&mut self) {
|
||||
self.parser.sync_uvars_and_fire(true /* always */);
|
||||
}
|
||||
|
||||
fn ioport_notified(&mut self) {
|
||||
// Our iothread signaller was posted, indicating some debouncer has a new result.
|
||||
self.debouncers.event_signaller.try_consume();
|
||||
self.service_debounced_results();
|
||||
}
|
||||
|
||||
fn paste_start_buffering(&mut self) {
|
||||
self.input_data.paste_buffer = Some(vec![]);
|
||||
self.push_front(CharEvent::from_readline(ReadlineCmd::BeginUndoGroup));
|
||||
}
|
||||
|
||||
fn paste_commit(&mut self) {
|
||||
self.push_front(CharEvent::from_readline(ReadlineCmd::EndUndoGroup));
|
||||
let Some(buffer) = self.input_data.paste_buffer.take() else {
|
||||
return;
|
||||
};
|
||||
self.push_front(CharEvent::Command(sprintf!(
|
||||
"__fish_paste %s",
|
||||
escape(&bytes2wcstring(&buffer))
|
||||
)));
|
||||
}
|
||||
}
|
||||
53
src/reader/iothreads.rs
Normal file
53
src/reader/iothreads.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
//! A thread pool for handling operations related to the interactive reader
|
||||
//! which might block, such as file I/O or completions.
|
||||
|
||||
use super::{Reader, reader};
|
||||
use crate::fd_monitor::FdEventSignaller;
|
||||
use crate::threads::ThreadPool;
|
||||
use crate::threads::debounce::Debounce;
|
||||
use std::os::unix::io::RawFd;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
/// The maximum number of I/O threads in a pool used to service background reader operations.
|
||||
const IO_MAX_THREADS: usize = 16;
|
||||
|
||||
/// A reader callback after some reader I/O operation completes.
|
||||
pub(super) type Callback = Box<dyn for<'a> FnOnce(&mut Reader<'a>) + Send>;
|
||||
|
||||
/// Separate debouncers for various reader operations.
|
||||
pub(super) struct Debouncers {
|
||||
// The event signaller shared by all debouncers.
|
||||
pub event_signaller: Arc<FdEventSignaller>,
|
||||
// Debounce autosuggestion computations.
|
||||
pub autosuggestions: Debounce<reader::AutosuggestionResult>,
|
||||
// Debounce syntax highlighting.
|
||||
pub highlight: Debounce<reader::HighlightResult>,
|
||||
// Debounce history pager computations. This holds a callback, not a single value,
|
||||
// both to demonstrate the technique and because the callback can capture local variables.
|
||||
pub history_pager: Debounce<Callback>,
|
||||
}
|
||||
|
||||
impl Debouncers {
|
||||
pub fn new() -> Self {
|
||||
let pool = ThreadPool::new(1, IO_MAX_THREADS);
|
||||
// These timeouts control how long until a thread is considered abandoned and
|
||||
// any queued work is assigned to a new thread.
|
||||
let event_signaller = Arc::new(FdEventSignaller::new());
|
||||
const HIGHLIGHT_TIMEOUT: Duration = Duration::from_millis(500);
|
||||
const HISTORY_PAGER_TIMEOUT: Duration = Duration::from_millis(500);
|
||||
const AUTOSUGGEST_TIMEOUT: Duration = Duration::from_millis(500);
|
||||
Self {
|
||||
autosuggestions: Debounce::new(&pool, &event_signaller, AUTOSUGGEST_TIMEOUT),
|
||||
highlight: Debounce::new(&pool, &event_signaller, HIGHLIGHT_TIMEOUT),
|
||||
history_pager: Debounce::new(&pool, &event_signaller, HISTORY_PAGER_TIMEOUT),
|
||||
event_signaller,
|
||||
}
|
||||
}
|
||||
|
||||
/// Return the read fd of the event signaller.
|
||||
/// This may be used with `poll()` or `select()` to multiplex iothread completions with other events.
|
||||
pub fn event_signaller_read_fd(&self) -> RawFd {
|
||||
self.event_signaller.read_fd()
|
||||
}
|
||||
}
|
||||
8
src/reader/mod.rs
Normal file
8
src/reader/mod.rs
Normal file
@@ -0,0 +1,8 @@
|
||||
mod history_search;
|
||||
|
||||
mod input;
|
||||
pub mod iothreads;
|
||||
#[allow(clippy::module_inception)]
|
||||
pub mod reader;
|
||||
|
||||
pub use reader::*;
|
||||
@@ -38,7 +38,6 @@
|
||||
use std::os::fd::BorrowedFd;
|
||||
use std::os::fd::{AsRawFd, RawFd};
|
||||
use std::pin::Pin;
|
||||
use std::rc::Rc;
|
||||
#[cfg(target_has_atomic = "64")]
|
||||
use std::sync::atomic::AtomicU64;
|
||||
use std::sync::atomic::{AtomicI32, AtomicU8, AtomicU32, Ordering};
|
||||
@@ -47,6 +46,8 @@
|
||||
|
||||
use errno::{Errno, errno};
|
||||
|
||||
use super::history_search::{ReaderHistorySearch, SearchMode, smartcase_flags};
|
||||
use super::iothreads::{self, Debouncers};
|
||||
use crate::abbrs::abbrs_match;
|
||||
use crate::ast::{self, Kind, is_same_node};
|
||||
use crate::builtins::shared::ErrorCode;
|
||||
@@ -115,7 +116,6 @@
|
||||
have_proc_stat, hup_jobs, is_interactive_session, job_reap, jobs_requiring_warning_on_exit,
|
||||
print_exit_warning_for_jobs, proc_update_jiffies,
|
||||
};
|
||||
use crate::reader_history_search::{ReaderHistorySearch, SearchMode, smartcase_flags};
|
||||
use crate::screen::is_dumb;
|
||||
use crate::screen::{CharOffset, Screen, screen_force_clear_to_end};
|
||||
use crate::should_flog;
|
||||
@@ -135,10 +135,7 @@
|
||||
use crate::termsize::{termsize_invalidate_tty, termsize_last, termsize_update};
|
||||
use crate::text_face::TextFace;
|
||||
use crate::text_face::parse_text_face;
|
||||
use crate::threads::{
|
||||
Debounce, assert_is_background_thread, assert_is_main_thread,
|
||||
iothread_service_main_with_timeout,
|
||||
};
|
||||
use crate::threads::{assert_is_background_thread, assert_is_main_thread};
|
||||
use crate::tokenizer::quote_end;
|
||||
use crate::tokenizer::variable_assignment_equals_pos;
|
||||
use crate::tokenizer::{
|
||||
@@ -212,25 +209,6 @@ fn commandline_state_snapshot() -> MutexGuard<'static, CommandlineState> {
|
||||
/// background threads to notice it and skip doing work that they would otherwise have to do.
|
||||
static GENERATION: AtomicU32 = AtomicU32::new(0);
|
||||
|
||||
/// Get the debouncer for autosuggestions and background highlighting.
|
||||
fn debounce_autosuggestions() -> &'static Debounce {
|
||||
const AUTOSUGGEST_TIMEOUT: Duration = Duration::from_millis(500);
|
||||
static RES: once_cell::race::OnceBox<Debounce> = once_cell::race::OnceBox::new();
|
||||
RES.get_or_init(|| Box::new(Debounce::new(AUTOSUGGEST_TIMEOUT)))
|
||||
}
|
||||
|
||||
fn debounce_highlighting() -> &'static Debounce {
|
||||
const HIGHLIGHT_TIMEOUT: Duration = Duration::from_millis(500);
|
||||
static RES: once_cell::race::OnceBox<Debounce> = once_cell::race::OnceBox::new();
|
||||
RES.get_or_init(|| Box::new(Debounce::new(HIGHLIGHT_TIMEOUT)))
|
||||
}
|
||||
|
||||
fn debounce_history_pager() -> &'static Debounce {
|
||||
const HISTORY_PAGER_TIMEOUT: Duration = Duration::from_millis(500);
|
||||
static RES: once_cell::race::OnceBox<Debounce> = once_cell::race::OnceBox::new();
|
||||
RES.get_or_init(|| Box::new(Debounce::new(HISTORY_PAGER_TIMEOUT)))
|
||||
}
|
||||
|
||||
fn redirect_tty_after_sighup() {
|
||||
use std::fs::OpenOptions;
|
||||
|
||||
@@ -645,9 +623,6 @@ enum TransientEdit {
|
||||
/// reader_readline() calls are nested. This happens when the 'read' builtin is used.
|
||||
/// ReaderData does not contain a Parser - by itself it cannot execute fish script.
|
||||
pub struct ReaderData {
|
||||
/// We could put the entire thing in an Rc but Rc::get_unchecked_mut is not yet stable.
|
||||
/// This is sufficient for our use.
|
||||
canary: Rc<()>,
|
||||
/// Configuration for the reader.
|
||||
conf: ReaderConfig,
|
||||
/// String containing the whole current commandline.
|
||||
@@ -735,9 +710,13 @@ pub struct ReaderData {
|
||||
in_flight_autosuggest_request: WString,
|
||||
|
||||
rls: Option<ReadlineLoopState>,
|
||||
|
||||
/// Support for I/O threads associated with this reader state, including debouncers.
|
||||
pub(super) debouncers: Debouncers,
|
||||
}
|
||||
|
||||
/// Reader is ReaderData equippeed with a Parser, so it can execute fish script.
|
||||
/// Reader is ReaderData equipped with a Parser, so it can execute fish script.
|
||||
/// It also provides access to I/O threads.
|
||||
pub struct Reader<'a> {
|
||||
pub data: &'a mut ReaderData,
|
||||
pub parser: &'a Parser,
|
||||
@@ -762,6 +741,20 @@ impl<'a> Reader<'a> {
|
||||
fn vars(&self) -> &dyn Environment {
|
||||
self.parser.vars()
|
||||
}
|
||||
|
||||
pub(super) fn service_debounced_results(&mut self) {
|
||||
// Some iothread operation completed, indicating a debouncer has a new result.
|
||||
// Check all of them.
|
||||
if let Some(r) = self.debouncers.autosuggestions.take_result() {
|
||||
self.autosuggest_completed(r);
|
||||
}
|
||||
if let Some(r) = self.debouncers.highlight.take_result() {
|
||||
self.highlight_completed(r);
|
||||
}
|
||||
if let Some(cb) = self.debouncers.history_pager.take_result() {
|
||||
cb(self);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Read commands from \c fd until encountering EOF.
|
||||
@@ -1319,7 +1312,6 @@ fn new(
|
||||
command_line.set_position(state.cursor_pos);
|
||||
}
|
||||
Pin::new(Box::new(Self {
|
||||
canary: Rc::new(()),
|
||||
conf,
|
||||
command_line,
|
||||
command_line_transient_edit: None,
|
||||
@@ -1357,6 +1349,7 @@ fn new(
|
||||
in_flight_highlight_request: Default::default(),
|
||||
in_flight_autosuggest_request: Default::default(),
|
||||
rls: None,
|
||||
debouncers: Debouncers::new(),
|
||||
}))
|
||||
}
|
||||
|
||||
@@ -4764,7 +4757,7 @@ fn exec_prompt(&mut self, full_prompt: bool, final_prompt: bool) {
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct Autosuggestion {
|
||||
pub(super) struct Autosuggestion {
|
||||
// The text to use, as an extension/replacement of the current line.
|
||||
text: WString,
|
||||
|
||||
@@ -4792,7 +4785,7 @@ fn is_empty(&self) -> bool {
|
||||
|
||||
/// The result of an autosuggestion computation.
|
||||
#[derive(Default)]
|
||||
struct AutosuggestionResult {
|
||||
pub(super) struct AutosuggestionResult {
|
||||
// The autosuggestion.
|
||||
autosuggestion: Autosuggestion,
|
||||
|
||||
@@ -5096,14 +5089,7 @@ fn update_autosuggestion(&mut self) {
|
||||
el.position(),
|
||||
self.history.clone(),
|
||||
);
|
||||
let canary = Rc::downgrade(&self.canary);
|
||||
let completion = move |zelf: &mut Reader, result| {
|
||||
if canary.upgrade().is_none() {
|
||||
return;
|
||||
}
|
||||
zelf.autosuggest_completed(result);
|
||||
};
|
||||
debounce_autosuggestions().perform_with_completion(performer, completion);
|
||||
self.debouncers.autosuggestions.perform(performer);
|
||||
}
|
||||
|
||||
fn is_at_end(&self) -> bool {
|
||||
@@ -5195,7 +5181,7 @@ fn accept_autosuggestion(&mut self, amount: AutosuggestionPortion) {
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct HighlightResult {
|
||||
pub(super) struct HighlightResult {
|
||||
colors: Vec<HighlightSpec>,
|
||||
text: WString,
|
||||
}
|
||||
@@ -5223,7 +5209,7 @@ fn get_highlight_performer(
|
||||
}
|
||||
|
||||
impl<'a> Reader<'a> {
|
||||
fn highlight_complete(&mut self, result: HighlightResult) {
|
||||
fn highlight_completed(&mut self, result: HighlightResult) {
|
||||
assert_is_main_thread();
|
||||
self.in_flight_highlight_request.clear();
|
||||
if result.text == self.command_line.text() {
|
||||
@@ -5250,14 +5236,7 @@ fn super_highlight_me_plenty(&mut self) {
|
||||
FLOG!(reader_render, "Highlighting");
|
||||
let highlight_performer =
|
||||
get_highlight_performer(self.parser, &self.command_line, /*io_ok=*/ true);
|
||||
let canary = Rc::downgrade(&self.canary);
|
||||
let completion = move |zelf: &mut Reader, result| {
|
||||
if canary.upgrade().is_none() {
|
||||
return;
|
||||
}
|
||||
zelf.highlight_complete(result);
|
||||
};
|
||||
debounce_highlighting().perform_with_completion(highlight_performer, completion);
|
||||
self.debouncers.highlight.perform(highlight_performer);
|
||||
}
|
||||
|
||||
/// Finish up any outstanding syntax highlighting, before execution.
|
||||
@@ -5283,10 +5262,9 @@ fn finish_highlighting_before_exec(&mut self) {
|
||||
let deadline = now + HIGHLIGHT_TIMEOUT_FOR_EXECUTION;
|
||||
while now < deadline {
|
||||
let timeout = deadline - now;
|
||||
iothread_service_main_with_timeout(self, timeout);
|
||||
|
||||
// Note iothread_service_main_with_timeout will reentrantly modify us,
|
||||
// by invoking a completion.
|
||||
if let Some(result) = self.debouncers.highlight.take_result_with_timeout(timeout) {
|
||||
self.highlight_completed(result);
|
||||
}
|
||||
if self.in_flight_highlight_request.is_empty() {
|
||||
break;
|
||||
}
|
||||
@@ -5302,12 +5280,12 @@ fn finish_highlighting_before_exec(&mut self) {
|
||||
// We need to do a quick highlight without I/O.
|
||||
let highlight_no_io =
|
||||
get_highlight_performer(self.parser, &self.command_line, /*io_ok=*/ false);
|
||||
self.highlight_complete(highlight_no_io());
|
||||
self.highlight_completed(highlight_no_io());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
struct HistoryPagerResult {
|
||||
pub(super) struct HistoryPagerResult {
|
||||
matched_commands: Vec<Completion>,
|
||||
range: Range<usize>,
|
||||
first_shown: usize,
|
||||
@@ -5315,7 +5293,7 @@ struct HistoryPagerResult {
|
||||
}
|
||||
|
||||
#[derive(Eq, PartialEq)]
|
||||
enum HistoryPagerInvocation {
|
||||
pub(super) enum HistoryPagerInvocation {
|
||||
Anew,
|
||||
Advance,
|
||||
Refresh,
|
||||
@@ -5423,52 +5401,56 @@ fn fill_history_pager(
|
||||
}
|
||||
}
|
||||
let search_term = self.pager.search_field_line.text().to_owned();
|
||||
let performer = {
|
||||
let history = self.history.clone();
|
||||
let search_term = search_term.clone();
|
||||
move || history_pager_search(&history, direction, motion, index, &search_term)
|
||||
// Get a performer that produces the history pager result.
|
||||
let history = self.history.clone();
|
||||
let search_term = search_term.clone();
|
||||
let performer = move || -> iothreads::Callback {
|
||||
let result = history_pager_search(&history, direction, motion, index, &search_term);
|
||||
Box::new(move |r: &mut Reader| {
|
||||
r.fill_history_pager_complete(result, why, old_pager_index)
|
||||
})
|
||||
};
|
||||
let canary = Rc::downgrade(&self.canary);
|
||||
let completion = move |zelf: &mut Reader, result: HistoryPagerResult| {
|
||||
if canary.upgrade().is_none() {
|
||||
return;
|
||||
}
|
||||
if search_term != zelf.pager.search_field_line.text() {
|
||||
return; // Stale request.
|
||||
}
|
||||
let history_size = zelf.history.size();
|
||||
let Some(history_pager) = zelf.history_pager.as_mut() else {
|
||||
return; // Pager has been closed.
|
||||
self.debouncers.history_pager.perform(performer);
|
||||
}
|
||||
}
|
||||
|
||||
impl<'a> Reader<'a> {
|
||||
fn fill_history_pager_complete(
|
||||
&mut self,
|
||||
result: HistoryPagerResult,
|
||||
why: HistoryPagerInvocation,
|
||||
old_pager_index: Option<usize>,
|
||||
) {
|
||||
let history_size = self.history.size();
|
||||
let Some(history_pager) = self.history_pager.as_mut() else {
|
||||
return; // Pager has been closed.
|
||||
};
|
||||
assert!(result.range.start < result.range.end);
|
||||
*history_pager = result.range;
|
||||
self.pager.extra_progress_text =
|
||||
if !result.matched_commands.is_empty() && *history_pager != (0..history_size + 1) {
|
||||
wgettext_fmt!(
|
||||
"Items %u to %u of %u",
|
||||
match history_pager.start {
|
||||
0 => 1,
|
||||
_ => result.first_shown,
|
||||
},
|
||||
history_pager.end - 1,
|
||||
history_size
|
||||
)
|
||||
} else {
|
||||
L!("").to_owned()
|
||||
};
|
||||
assert!(result.range.start < result.range.end);
|
||||
*history_pager = result.range;
|
||||
zelf.pager.extra_progress_text =
|
||||
if !result.matched_commands.is_empty() && *history_pager != (0..history_size + 1) {
|
||||
wgettext_fmt!(
|
||||
"Items %u to %u of %u",
|
||||
match history_pager.start {
|
||||
0 => 1,
|
||||
_ => result.first_shown,
|
||||
},
|
||||
history_pager.end - 1,
|
||||
history_size
|
||||
)
|
||||
} else {
|
||||
L!("").to_owned()
|
||||
};
|
||||
zelf.pager.set_completions(&result.matched_commands, false);
|
||||
if why == HistoryPagerInvocation::Refresh {
|
||||
zelf.pager.set_selected_completion_index(old_pager_index);
|
||||
zelf.pager_selection_changed();
|
||||
}
|
||||
if let Some(motion) = result.motion {
|
||||
zelf.select_completion_in_direction(motion, true);
|
||||
}
|
||||
zelf.super_highlight_me_plenty();
|
||||
zelf.layout_and_repaint(L!("history-pager"));
|
||||
};
|
||||
let debouncer = debounce_history_pager();
|
||||
debouncer.perform_with_completion(performer, completion);
|
||||
self.pager.set_completions(&result.matched_commands, false);
|
||||
if why == HistoryPagerInvocation::Refresh {
|
||||
self.pager.set_selected_completion_index(old_pager_index);
|
||||
self.pager_selection_changed();
|
||||
}
|
||||
if let Some(motion) = result.motion {
|
||||
self.select_completion_in_direction(motion, true);
|
||||
}
|
||||
self.super_highlight_me_plenty();
|
||||
self.layout_and_repaint(L!("history-pager"));
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6735,9 +6717,9 @@ fn completion_insert(
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{combine_command_and_autosuggestion, completion_apply_to_command_line};
|
||||
use crate::complete::CompleteFlags;
|
||||
use crate::operation_context::{OperationContext, no_cancel};
|
||||
use crate::reader::{combine_command_and_autosuggestion, completion_apply_to_command_line};
|
||||
use crate::tests::prelude::*;
|
||||
use crate::wchar::prelude::*;
|
||||
|
||||
303
src/threads/debounce.rs
Normal file
303
src/threads/debounce.rs
Normal file
@@ -0,0 +1,303 @@
|
||||
use super::ThreadPool;
|
||||
use crate::fd_monitor::FdEventSignaller;
|
||||
use crate::fd_readable_set;
|
||||
/// Support for debounced background execution of functions.
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
/// A debounced function to execute.
|
||||
pub(super) type WorkItem = Box<dyn FnOnce() + 'static + Send>;
|
||||
|
||||
/// `Debounce` is a simple struct which executes one function on a background thread while enqueuing
|
||||
/// at most one more. Subsequent execution requests overwrite the enqueued one. It takes an optional
|
||||
/// timeout; if a handler does not finish within the timeout then a new thread is spawned to service
|
||||
/// the remaining request.
|
||||
///
|
||||
/// Debounce implementation note: we would like to enqueue at most one request, except if a thread
|
||||
/// hangs (e.g. on fs access) then we do not want to block indefinitely - such threads are called
|
||||
/// "abandoned". This is implemented via a monotone uint64 counter, called a token. Every time we
|
||||
/// spawn a thread, we increment the token. When the thread has completed running a work item, it
|
||||
/// compares its token to the active token; if they differ then this thread was abandoned.
|
||||
#[derive(Clone)]
|
||||
pub struct Debounce<R> {
|
||||
/// The thread pool to use for background execution.
|
||||
pool: Arc<ThreadPool>,
|
||||
/// The timeout after which a running thread is considered abandoned.
|
||||
timeout: Duration,
|
||||
/// The data shared between a [`Debounce`] instance and its thread.
|
||||
data: Arc<Mutex<DebounceData<R>>>,
|
||||
/// An event signaller used to indicate that a result has arrived.
|
||||
/// Note the usage order here matters:
|
||||
/// 1. To enqueue a result, first store it, then post to the event signaller.
|
||||
/// 2. To service a result, first consume the event signaller, then process the result.
|
||||
event_signaller: Arc<FdEventSignaller>,
|
||||
}
|
||||
|
||||
/// The data shared between a [`Debounce`] and its thread.
|
||||
struct DebounceData<R> {
|
||||
/// The (one or none) next enqueued request, overwritten each time a new call to
|
||||
/// [`Debounce::perform()`] is made.
|
||||
next_req: Option<WorkItem>,
|
||||
/// The non-zero token of the current non-abandoned thread or `None` if no thread is running.
|
||||
active_token: Option<NonZeroU64>,
|
||||
/// The next token to use when spawning a thread.
|
||||
next_token: NonZeroU64,
|
||||
/// The start time of the most recently spawned thread or request (if any).
|
||||
start_time: Instant,
|
||||
/// The most recent result, at most one. This is overwritten by the most recent completing request.
|
||||
result: Option<R>,
|
||||
}
|
||||
|
||||
impl<R: Send + 'static> Debounce<R> {
|
||||
pub fn new(
|
||||
pool: &Arc<ThreadPool>,
|
||||
event_signaller: &Arc<FdEventSignaller>,
|
||||
timeout: Duration,
|
||||
) -> Self {
|
||||
Self {
|
||||
pool: Arc::clone(pool),
|
||||
timeout,
|
||||
event_signaller: Arc::clone(event_signaller),
|
||||
data: Arc::new(Mutex::new(DebounceData {
|
||||
next_req: None,
|
||||
active_token: None,
|
||||
next_token: NonZeroU64::new(1).unwrap(),
|
||||
start_time: Instant::now(),
|
||||
result: None,
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
/// Run an iteration in the background with the given thread token. Returns `true` if we handled
|
||||
/// a request or `false` if there were no requests to handle (in which case the debounce thread
|
||||
/// exits).
|
||||
///
|
||||
/// Note that this method is called from a background thread.
|
||||
fn run_next(data: &Mutex<DebounceData<R>>, token: NonZeroU64) -> bool {
|
||||
let request = {
|
||||
let mut data = data.lock().expect("Mutex poisoned!");
|
||||
if let Some(req) = data.next_req.take() {
|
||||
data.start_time = Instant::now();
|
||||
req
|
||||
} else {
|
||||
// There is no pending request. Mark this token as no longer running.
|
||||
if Some(token) == data.active_token {
|
||||
data.active_token = None;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
// Execute request after unlocking the mutex.
|
||||
(request)();
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Enqueue `handler` to be performed on a background thread. If another function is already
|
||||
/// enqueued, this overwrites it and that function will not be executed.
|
||||
///
|
||||
/// The result is a token which is only of interest to the test suite.
|
||||
pub fn perform_void(&self, handler: impl FnOnce() + 'static + Send) -> NonZeroU64 {
|
||||
self.perform_inner(Box::new(handler))
|
||||
}
|
||||
|
||||
/// Enqueue `handler` to be performed on a background thread with a function to
|
||||
/// If a function is already enqueued, this overwrites it and that function
|
||||
/// will not be executed.
|
||||
///
|
||||
/// The result is a token which is only of interest to the test suite.
|
||||
pub fn perform<Handler>(&self, handler: Handler) -> NonZeroU64
|
||||
where
|
||||
Handler: FnOnce() -> R + 'static + Send,
|
||||
{
|
||||
let data = Arc::clone(&self.data);
|
||||
let event_signaller = Arc::clone(&self.event_signaller);
|
||||
let work_item = Box::new(move || {
|
||||
let result = handler();
|
||||
// Store the result and signal its availability.
|
||||
data.lock().unwrap().result = Some(result);
|
||||
event_signaller.post();
|
||||
});
|
||||
self.perform_inner(work_item)
|
||||
}
|
||||
|
||||
fn perform_inner(&self, work_item: WorkItem) -> NonZeroU64 {
|
||||
let mut spawn = false;
|
||||
let active_token = {
|
||||
let mut data = self.data.lock().expect("Mutex poisoned!");
|
||||
data.next_req = Some(work_item);
|
||||
// If we have a timeout and our running thread has exceeded it, abandon that thread.
|
||||
if data.active_token.is_some()
|
||||
&& !self.timeout.is_zero()
|
||||
&& (Instant::now() - data.start_time > self.timeout)
|
||||
{
|
||||
// Abandon this thread by dissociating its token from this [`Debounce`] instance.
|
||||
data.active_token = None;
|
||||
}
|
||||
if data.active_token.is_none() {
|
||||
// We need to spawn a new thread. Mark the current time so that a new request won't
|
||||
// immediately abandon us and start a new thread too.
|
||||
spawn = true;
|
||||
data.active_token = Some(data.next_token);
|
||||
data.next_token = data.next_token.checked_add(1).unwrap();
|
||||
data.start_time = Instant::now();
|
||||
}
|
||||
data.active_token.expect("Something should be active now.")
|
||||
};
|
||||
|
||||
// Spawn after unlocking the mutex above.
|
||||
if spawn {
|
||||
// We need to clone the Arc to get it to last for the duration of the 'static lifetime.
|
||||
let data = Arc::clone(&self.data);
|
||||
self.pool.perform(move || {
|
||||
while Self::run_next(&data, active_token) {
|
||||
// Keep thread alive/busy.
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
active_token
|
||||
}
|
||||
|
||||
/// Take the result if available.
|
||||
pub fn take_result(&mut self) -> Option<R> {
|
||||
self.data.lock().unwrap().result.take()
|
||||
}
|
||||
|
||||
/// Take the result, waiting up to `timeout` for a result to be available.
|
||||
pub fn take_result_with_timeout(&mut self, timeout: Duration) -> Option<R> {
|
||||
let timeout = fd_readable_set::Timeout::Duration(timeout);
|
||||
if fd_readable_set::is_fd_readable(self.event_signaller.read_fd(), timeout) {
|
||||
self.take_result()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{Debounce, ThreadPool};
|
||||
use crate::fd_monitor::FdEventSignaller;
|
||||
use crate::global_safety::RelaxedAtomicBool;
|
||||
|
||||
use std::sync::{
|
||||
Arc, Condvar, Mutex,
|
||||
atomic::{AtomicU32, Ordering},
|
||||
};
|
||||
use std::time::Duration;
|
||||
|
||||
#[test]
|
||||
fn test_debounce() {
|
||||
let pool = ThreadPool::new(1, 16);
|
||||
let event_signaller = Arc::new(FdEventSignaller::new());
|
||||
// Run 8 functions using a condition variable.
|
||||
// Only the first and last should run.
|
||||
let mut db = Debounce::new(&pool, &event_signaller, Duration::from_secs(0));
|
||||
const COUNT: usize = 8;
|
||||
|
||||
let mut result_ready: [bool; COUNT] = Default::default();
|
||||
|
||||
struct Context {
|
||||
handler_ran: [RelaxedAtomicBool; COUNT],
|
||||
ready_to_go: Mutex<bool>,
|
||||
cv: Condvar,
|
||||
}
|
||||
|
||||
let ctx = Arc::new(Context {
|
||||
handler_ran: std::array::from_fn(|_i| RelaxedAtomicBool::new(false)),
|
||||
ready_to_go: Mutex::new(false),
|
||||
cv: Condvar::new(),
|
||||
});
|
||||
|
||||
// "Enqueue" all functions. Each one waits until ready_to_go.
|
||||
for idx in 0..COUNT {
|
||||
assert!(!ctx.handler_ran[idx].load());
|
||||
let performer = {
|
||||
let ctx = ctx.clone();
|
||||
move || {
|
||||
let guard = ctx.ready_to_go.lock().unwrap();
|
||||
let _guard = ctx.cv.wait_while(guard, |ready| !*ready).unwrap();
|
||||
ctx.handler_ran[idx].store(true);
|
||||
idx
|
||||
}
|
||||
};
|
||||
db.perform(performer);
|
||||
}
|
||||
|
||||
// We're ready to go.
|
||||
*ctx.ready_to_go.lock().unwrap() = true;
|
||||
ctx.cv.notify_all();
|
||||
|
||||
// Wait until the last result is ready.
|
||||
while !result_ready.last().unwrap() {
|
||||
if let Some(result_idx) = db.take_result() {
|
||||
result_ready[result_idx] = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Each perform() call may displace an existing queued operation.
|
||||
// Each operation waits until all are queued.
|
||||
// Therefore we expect the last perform() to have run, and at most one more.
|
||||
assert!(ctx.handler_ran.last().unwrap().load());
|
||||
assert!(result_ready.last().unwrap());
|
||||
|
||||
let mut total_ran = 0;
|
||||
for idx in 0..COUNT {
|
||||
if ctx.handler_ran[idx].load() {
|
||||
total_ran += 1;
|
||||
}
|
||||
}
|
||||
assert!(total_ran <= 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_debounce_timeout() {
|
||||
// Verify that debounce doesn't wait forever.
|
||||
let pool = ThreadPool::new(1, 16);
|
||||
let event_signaller = Arc::new(FdEventSignaller::new());
|
||||
let timeout = Duration::from_millis(500);
|
||||
let db = Debounce::new(&pool, &event_signaller, timeout);
|
||||
|
||||
struct Data {
|
||||
db: Debounce<usize>,
|
||||
exit_ok: Mutex<bool>,
|
||||
cv: Condvar,
|
||||
running: AtomicU32,
|
||||
}
|
||||
|
||||
let data = Arc::new(Data {
|
||||
db,
|
||||
exit_ok: Mutex::new(false),
|
||||
cv: Condvar::new(),
|
||||
running: AtomicU32::new(0),
|
||||
});
|
||||
|
||||
// Our background handler. Note this just blocks until exit_ok is set.
|
||||
let handler = {
|
||||
let data = data.clone();
|
||||
move || {
|
||||
data.running.fetch_add(1, Ordering::Relaxed);
|
||||
let guard = data.exit_ok.lock().unwrap();
|
||||
let _guard = data.cv.wait_while(guard, |exit_ok| !*exit_ok);
|
||||
}
|
||||
};
|
||||
|
||||
// Spawn the handler twice. This should not modify the thread token.
|
||||
let token1 = data.db.perform_void(handler.clone());
|
||||
let token2 = data.db.perform_void(handler.clone());
|
||||
assert_eq!(token1, token2);
|
||||
|
||||
// Wait 75 msec, then enqueue something else; this should spawn a new thread.
|
||||
std::thread::sleep(timeout + timeout / 2);
|
||||
assert!(data.running.load(Ordering::Relaxed) == 1);
|
||||
let token3 = data.db.perform_void(handler);
|
||||
assert!(token3 > token2);
|
||||
|
||||
// Release all the threads.
|
||||
let mut exit_ok = data.exit_ok.lock().unwrap();
|
||||
*exit_ok = true;
|
||||
data.cv.notify_all();
|
||||
}
|
||||
}
|
||||
5
src/threads/mod.rs
Normal file
5
src/threads/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
||||
pub mod debounce;
|
||||
#[allow(clippy::module_inception)]
|
||||
pub mod threads;
|
||||
|
||||
pub use threads::*;
|
||||
@@ -1,14 +1,10 @@
|
||||
//! The rusty version of iothreads from the cpp code, to be consumed by native rust code. This isn't
|
||||
//! ported directly from the cpp code so we can use rust threads instead of using pthreads.
|
||||
|
||||
//! Support for thread pools and thread management.
|
||||
use crate::flog::{FLOG, FloggableDebug};
|
||||
use crate::reader::Reader;
|
||||
use std::marker::PhantomData;
|
||||
use std::mem::MaybeUninit;
|
||||
use std::num::NonZeroU64;
|
||||
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
|
||||
use std::sync::{Arc, Mutex, OnceLock};
|
||||
use std::time::{Duration, Instant};
|
||||
use std::time::Duration;
|
||||
|
||||
impl FloggableDebug for std::thread::ThreadId {}
|
||||
|
||||
@@ -19,39 +15,13 @@ impl FloggableDebug for std::thread::ThreadId {}
|
||||
/// This allows us to notice when we've forked.
|
||||
static IS_FORKED_PROC: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
/// Maximum number of threads for the IO thread pool.
|
||||
const IO_MAX_THREADS: usize = 1024;
|
||||
|
||||
/// How long an idle [`ThreadPool`] thread will wait for work (against the condition variable)
|
||||
/// before exiting.
|
||||
const IO_WAIT_FOR_WORK_DURATION: Duration = Duration::from_millis(500);
|
||||
|
||||
/// The iothreads [`ThreadPool`] singleton. Used to lift I/O off of the main thread and used for
|
||||
/// completions, etc.
|
||||
static IO_THREAD_POOL: OnceLock<Mutex<ThreadPool>> = OnceLock::new();
|
||||
|
||||
/// The event signaller singleton used for completions and queued main thread requests.
|
||||
static NOTIFY_SIGNALLER: once_cell::sync::Lazy<crate::fd_monitor::FdEventSignaller> =
|
||||
once_cell::sync::Lazy::new(crate::fd_monitor::FdEventSignaller::new);
|
||||
|
||||
/// A [`ThreadPool`] work request.
|
||||
type WorkItem = Box<dyn FnOnce() + 'static + Send>;
|
||||
|
||||
// A helper type to allow us to (temporarily) send an object to another thread.
|
||||
struct ForceSend<T>(T);
|
||||
|
||||
// Safety: only used on main thread.
|
||||
unsafe impl<T> Send for ForceSend<T> {}
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
type DebounceCallback = ForceSend<Box<dyn FnOnce(&mut Reader) + 'static>>;
|
||||
|
||||
/// The queue of [`WorkItem`]s to be executed on the main thread. This is read from in
|
||||
/// `iothread_service_main()`.
|
||||
///
|
||||
/// Since the queue is synchronized, items don't need to implement `Send`.
|
||||
static MAIN_THREAD_QUEUE: Mutex<Vec<DebounceCallback>> = Mutex::new(Vec::new());
|
||||
|
||||
/// Initialize some global static variables. Must be called at startup from the main thread.
|
||||
pub fn init() {
|
||||
MAIN_THREAD_ID
|
||||
@@ -65,10 +35,6 @@ extern "C" fn child_post_fork() {
|
||||
let result = libc::pthread_atfork(None, None, Some(child_post_fork));
|
||||
assert_eq!(result, 0, "pthread_atfork() failure: {}", errno::errno());
|
||||
}
|
||||
|
||||
IO_THREAD_POOL
|
||||
.set(Mutex::new(ThreadPool::new(1, IO_MAX_THREADS)))
|
||||
.expect("IO_THREAD_POOL has already been initialized!");
|
||||
}
|
||||
|
||||
#[inline(always)]
|
||||
@@ -154,11 +120,7 @@ fn panic_is_forked_child() {
|
||||
}
|
||||
}
|
||||
|
||||
/// The rusty version of `iothreads::make_detached_pthread()`. We will probably need a
|
||||
/// `spawn_scoped` version of the same to handle some more advanced borrow cases safely, and maybe
|
||||
/// an unsafe version that doesn't do any lifetime checking akin to
|
||||
/// `spawn_unchecked()`[std::thread::Builder::spawn_unchecked], which is a nightly-only feature.
|
||||
///
|
||||
/// Spawn a new thread to run the given callback.
|
||||
/// Returns a boolean indicating whether or not the thread was successfully launched. Failure here
|
||||
/// is not dependent on the passed callback and implies a system error (likely insufficient
|
||||
/// resources).
|
||||
@@ -189,7 +151,6 @@ pub fn spawn<F: FnOnce() + Send + 'static>(callback: F) -> bool {
|
||||
// extant requests. So we can ignore failure with some confidence.
|
||||
// We don't have to port the PTHREAD_CREATE_DETACHED logic. Rust threads are detached
|
||||
// automatically if the returned join handle is dropped.
|
||||
|
||||
let result = match std::thread::Builder::new().spawn(callback) {
|
||||
Ok(handle) => {
|
||||
let thread_id = thread_id();
|
||||
@@ -240,19 +201,12 @@ struct ThreadPoolProtected {
|
||||
pub waiting_threads: usize,
|
||||
}
|
||||
|
||||
/// Data behind an [`Arc`] to share between the [`ThreadPool`] and [`WorkerThread`] instances.
|
||||
#[derive(Default)]
|
||||
struct ThreadPoolShared {
|
||||
pub struct ThreadPool {
|
||||
/// The mutex to access shared state between [`ThreadPool`] and [`WorkerThread`] instances. This
|
||||
/// is accessed both standalone and via [`cond_var`](Self::cond_var).
|
||||
mutex: Mutex<ThreadPoolProtected>,
|
||||
shared: Mutex<ThreadPoolProtected>,
|
||||
/// The condition variable used to wake up waiting threads. This is tied to [`mutex`](Self::mutex).
|
||||
cond_var: std::sync::Condvar,
|
||||
}
|
||||
|
||||
pub struct ThreadPool {
|
||||
/// The data which needs to be shared with worker threads.
|
||||
shared: Arc<ThreadPoolShared>,
|
||||
/// The minimum number of threads that will be kept waiting even when idle in the pool.
|
||||
soft_min_threads: usize,
|
||||
/// The maximum number of threads that will be created to service outstanding work requests, by
|
||||
@@ -271,26 +225,22 @@ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
|
||||
impl ThreadPool {
|
||||
/// Construct a new `ThreadPool` instance with the specified min and max num of threads.
|
||||
pub fn new(soft_min_threads: usize, max_threads: usize) -> Self {
|
||||
ThreadPool {
|
||||
pub fn new(soft_min_threads: usize, max_threads: usize) -> Arc<Self> {
|
||||
Arc::new(ThreadPool {
|
||||
shared: Default::default(),
|
||||
cond_var: Default::default(),
|
||||
soft_min_threads,
|
||||
max_threads,
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Enqueue a new work item onto the thread pool.
|
||||
///
|
||||
/// The function `func` will execute on one of the pool's background threads. If `cant_wait` is
|
||||
/// set, the thread limit may be disregarded if extant threads are busy.
|
||||
/// The function `func` will execute on one of the pool's background threads.
|
||||
///
|
||||
/// Returns the number of threads that were alive when the work item was enqueued.
|
||||
pub fn perform<F: FnOnce() + 'static + Send>(&mut self, func: F, cant_wait: bool) -> usize {
|
||||
pub fn perform<F: FnOnce() + 'static + Send>(self: &Arc<Self>, func: F) -> usize {
|
||||
let work_item = Box::new(func);
|
||||
self.perform_inner(work_item, cant_wait)
|
||||
}
|
||||
|
||||
fn perform_inner(&mut self, f: WorkItem, cant_wait: bool) -> usize {
|
||||
enum ThreadAction {
|
||||
None,
|
||||
Wake,
|
||||
@@ -299,9 +249,9 @@ enum ThreadAction {
|
||||
|
||||
let local_thread_count;
|
||||
let thread_action = {
|
||||
let mut data = self.shared.mutex.lock().expect("Mutex poisoned!");
|
||||
let mut data = self.shared.lock().expect("Mutex poisoned!");
|
||||
local_thread_count = data.total_threads;
|
||||
data.request_queue.push_back(f);
|
||||
data.request_queue.push_back(work_item);
|
||||
FLOG!(
|
||||
iothread,
|
||||
"enqueuing work item (count is ",
|
||||
@@ -311,7 +261,7 @@ enum ThreadAction {
|
||||
if data.waiting_threads >= data.request_queue.len() {
|
||||
// There are enough waiting threads, wake one up.
|
||||
ThreadAction::Wake
|
||||
} else if cant_wait || data.total_threads < self.max_threads {
|
||||
} else if data.total_threads < self.max_threads {
|
||||
// No threads are idle waiting but we can or must spawn a new thread to service the
|
||||
// request.
|
||||
data.total_threads += 1;
|
||||
@@ -328,7 +278,7 @@ enum ThreadAction {
|
||||
ThreadAction::Wake => {
|
||||
// Wake a thread if we decided to do so.
|
||||
FLOG!(iothread, "notifying thread ", std::thread::current().id());
|
||||
self.shared.cond_var.notify_one();
|
||||
self.cond_var.notify_one();
|
||||
}
|
||||
ThreadAction::Spawn => {
|
||||
// Spawn a thread. If this fails, it means there are already a bunch of worker
|
||||
@@ -340,11 +290,7 @@ enum ThreadAction {
|
||||
FLOG!(iothread, "pthread spawned");
|
||||
} else {
|
||||
// We failed to spawn a thread; decrement the thread count.
|
||||
self.shared
|
||||
.mutex
|
||||
.lock()
|
||||
.expect("Mutex poisoned!")
|
||||
.total_threads -= 1;
|
||||
self.shared.lock().expect("Mutex poisoned!").total_threads -= 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -353,16 +299,10 @@ enum ThreadAction {
|
||||
}
|
||||
|
||||
/// Attempt to spawn a new worker thread.
|
||||
fn spawn_thread(&mut self) -> bool {
|
||||
let shared = Arc::clone(&self.shared);
|
||||
let soft_min_threads = self.soft_min_threads;
|
||||
fn spawn_thread(self: &Arc<Self>) -> bool {
|
||||
let pool = Arc::clone(self);
|
||||
self::spawn(move || {
|
||||
let worker = WorkerThread {
|
||||
shared,
|
||||
soft_min_threads,
|
||||
};
|
||||
|
||||
worker.run();
|
||||
pool.run_worker();
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -394,16 +334,10 @@ pub fn get(&self) -> &T {
|
||||
}
|
||||
}
|
||||
|
||||
pub struct WorkerThread {
|
||||
/// The data shared with the [`ThreadPool`].
|
||||
shared: Arc<ThreadPoolShared>,
|
||||
/// The soft min number of threads for the associated [`ThreadPool`].
|
||||
soft_min_threads: usize,
|
||||
}
|
||||
|
||||
impl WorkerThread {
|
||||
impl ThreadPool {
|
||||
/// The worker loop entry point for this thread.
|
||||
fn run(mut self) {
|
||||
/// This is run in a background thread.
|
||||
fn run_worker(&self) {
|
||||
while let Some(work_item) = self.dequeue_work_or_commit_to_exit() {
|
||||
FLOG!(
|
||||
iothread,
|
||||
@@ -426,8 +360,8 @@ fn run(mut self) {
|
||||
|
||||
/// Dequeue a work item (perhaps waiting on the condition variable) or commit to exiting by
|
||||
/// reducing the active thread count.
|
||||
fn dequeue_work_or_commit_to_exit(&mut self) -> Option<WorkItem> {
|
||||
let mut data = self.shared.mutex.lock().expect("Mutex poisoned!");
|
||||
fn dequeue_work_or_commit_to_exit(&self) -> Option<WorkItem> {
|
||||
let mut data = self.shared.lock().expect("Mutex poisoned!");
|
||||
|
||||
// If the queue is empty, check to see if we should wait. We should wait if our exiting
|
||||
// would drop us below our soft thread count minimum.
|
||||
@@ -437,7 +371,6 @@ fn dequeue_work_or_commit_to_exit(&mut self) -> Option<WorkItem> {
|
||||
{
|
||||
data.waiting_threads += 1;
|
||||
data = self
|
||||
.shared
|
||||
.cond_var
|
||||
.wait_timeout(data, IO_WAIT_FOR_WORK_DURATION)
|
||||
.expect("Mutex poisoned!")
|
||||
@@ -456,229 +389,18 @@ fn dequeue_work_or_commit_to_exit(&mut self) -> Option<WorkItem> {
|
||||
data.total_threads -= 1;
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns a [`MutexGuard`](std::sync::MutexGuard) containing the IO [`ThreadPool`].
|
||||
fn borrow_io_thread_pool() -> std::sync::MutexGuard<'static, ThreadPool> {
|
||||
IO_THREAD_POOL
|
||||
.get()
|
||||
.unwrap()
|
||||
.lock()
|
||||
.expect("Mutex poisoned!")
|
||||
}
|
||||
|
||||
/// Enqueues work on the IO thread pool singleton.
|
||||
pub fn iothread_perform(f: impl FnOnce() + 'static + Send) {
|
||||
let mut thread_pool = borrow_io_thread_pool();
|
||||
thread_pool.perform(f, false);
|
||||
}
|
||||
|
||||
/// Enqueues priority work on the IO thread pool singleton, disregarding the thread limit.
|
||||
///
|
||||
/// It does its best to spawn a thread if all other threads are occupied. This is primarily for
|
||||
/// cases where deferring creation of a new thread might lead to a deadlock.
|
||||
pub fn iothread_perform_cant_wait(f: impl FnOnce() + 'static + Send) {
|
||||
let mut thread_pool = borrow_io_thread_pool();
|
||||
thread_pool.perform(f, true);
|
||||
}
|
||||
|
||||
pub fn iothread_port() -> i32 {
|
||||
NOTIFY_SIGNALLER.read_fd()
|
||||
}
|
||||
|
||||
pub fn iothread_service_main_with_timeout(ctx: &mut Reader, timeout: Duration) {
|
||||
use crate::fd_readable_set::Timeout;
|
||||
|
||||
if crate::fd_readable_set::is_fd_readable(iothread_port(), Timeout::Duration(timeout)) {
|
||||
iothread_service_main(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn iothread_service_main(ctx: &mut Reader) {
|
||||
self::assert_is_main_thread();
|
||||
|
||||
// Note: the order here is important. We must consume events before handling requests, as
|
||||
// posting uses the opposite order.
|
||||
NOTIFY_SIGNALLER.try_consume();
|
||||
|
||||
let queue = std::mem::take(&mut *MAIN_THREAD_QUEUE.lock().expect("Mutex poisoned!"));
|
||||
|
||||
// Perform each completion in order.
|
||||
for callback in queue {
|
||||
(callback.0)(ctx);
|
||||
}
|
||||
}
|
||||
|
||||
/// Does nasty polling via select(), only used for testing.
|
||||
#[cfg(test)]
|
||||
pub(crate) fn iothread_drain_all(ctx: &mut Reader) {
|
||||
while borrow_io_thread_pool()
|
||||
.shared
|
||||
.mutex
|
||||
.lock()
|
||||
.expect("Mutex poisoned!")
|
||||
.total_threads
|
||||
> 0
|
||||
{
|
||||
iothread_service_main_with_timeout(ctx, Duration::from_millis(1000));
|
||||
}
|
||||
}
|
||||
|
||||
/// `Debounce` is a simple class which executes one function on a background thread while enqueuing
|
||||
/// at most one more. Subsequent execution requests overwrite the enqueued one. It takes an optional
|
||||
/// timeout; if a handler does not finish within the timeout then a new thread is spawned to service
|
||||
/// the remaining request.
|
||||
///
|
||||
/// Debounce implementation note: we would like to enqueue at most one request, except if a thread
|
||||
/// hangs (e.g. on fs access) then we do not want to block indefinitely - such threads are called
|
||||
/// "abandoned". This is implemented via a monotone uint64 counter, called a token. Every time we
|
||||
/// spawn a thread, we increment the token. When the thread has completed running a work item, it
|
||||
/// compares its token to the active token; if they differ then this thread was abandoned.
|
||||
#[derive(Clone)]
|
||||
pub struct Debounce {
|
||||
timeout: Duration,
|
||||
/// The data shared between [`Debounce`] instances.
|
||||
data: Arc<Mutex<DebounceData>>,
|
||||
}
|
||||
|
||||
/// The data shared between [`Debounce`] instances.
|
||||
struct DebounceData {
|
||||
/// The (one or none) next enqueued request, overwritten each time a new call to
|
||||
/// [`Debounce::perform()`] is made.
|
||||
next_req: Option<WorkItem>,
|
||||
/// The non-zero token of the current non-abandoned thread or `None` if no thread is running.
|
||||
active_token: Option<NonZeroU64>,
|
||||
/// The next token to use when spawning a thread.
|
||||
next_token: NonZeroU64,
|
||||
/// The start time of the most recently spawned thread or request (if any).
|
||||
start_time: Instant,
|
||||
}
|
||||
|
||||
impl Debounce {
|
||||
pub fn new(timeout: Duration) -> Self {
|
||||
Self {
|
||||
timeout,
|
||||
data: Arc::new(Mutex::new(DebounceData {
|
||||
next_req: None,
|
||||
active_token: None,
|
||||
next_token: NonZeroU64::new(1).unwrap(),
|
||||
start_time: Instant::now(),
|
||||
})),
|
||||
}
|
||||
}
|
||||
|
||||
/// Run an iteration in the background with the given thread token. Returns `true` if we handled
|
||||
/// a request or `false` if there were no requests to handle (in which case the debounce thread
|
||||
/// exits).
|
||||
///
|
||||
/// Note that this method is called from a background thread.
|
||||
fn run_next(&self, token: NonZeroU64) -> bool {
|
||||
let request = {
|
||||
let mut data = self.data.lock().expect("Mutex poisoned!");
|
||||
if let Some(req) = data.next_req.take() {
|
||||
data.start_time = Instant::now();
|
||||
req
|
||||
} else {
|
||||
// There is no pending request. Mark this token as no longer running.
|
||||
if Some(token) == data.active_token {
|
||||
data.active_token = None;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
// Execute request after unlocking the mutex.
|
||||
(request)();
|
||||
return true;
|
||||
}
|
||||
|
||||
/// Enqueue `handler` to be performed on a background thread. If another function is already
|
||||
/// enqueued, this overwrites it and that function will not be executed.
|
||||
///
|
||||
/// The result is a token which is only of interest to the test suite.
|
||||
pub fn perform(&self, handler: impl FnOnce() + 'static + Send) -> NonZeroU64 {
|
||||
self.perform_with_completion(handler, |_ctx, _result| ())
|
||||
}
|
||||
|
||||
/// Enqueue `handler` to be performed on a background thread with `completion` to be performed
|
||||
/// on the main thread. If a function is already enqueued, this overwrites it and that function
|
||||
/// will not be executed.
|
||||
///
|
||||
/// If the function executes within the optional timeout then `completion` will be invoked on
|
||||
/// the main thread with the result of the evaluated `handler`.
|
||||
///
|
||||
/// The result is a token which is only of interest to the test suite.
|
||||
pub fn perform_with_completion<H, R, C>(&self, handler: H, completion: C) -> NonZeroU64
|
||||
where
|
||||
H: FnOnce() -> R + 'static + Send,
|
||||
C: FnOnce(&mut Reader, R) + 'static,
|
||||
R: 'static + Send,
|
||||
{
|
||||
assert_is_main_thread();
|
||||
let completion_wrapper = ForceSend(completion);
|
||||
let work_item = Box::new(move || {
|
||||
let result = handler();
|
||||
let callback: DebounceCallback = ForceSend(Box::new(move |ctx| {
|
||||
let completion = completion_wrapper;
|
||||
(completion.0)(ctx, result);
|
||||
}));
|
||||
MAIN_THREAD_QUEUE.lock().unwrap().push(callback);
|
||||
NOTIFY_SIGNALLER.post();
|
||||
});
|
||||
self.perform_inner(work_item)
|
||||
}
|
||||
|
||||
fn perform_inner(&self, work_item: WorkItem) -> NonZeroU64 {
|
||||
let mut spawn = false;
|
||||
let active_token = {
|
||||
let mut data = self.data.lock().expect("Mutex poisoned!");
|
||||
data.next_req = Some(work_item);
|
||||
// If we have a timeout and our running thread has exceeded it, abandon that thread.
|
||||
if data.active_token.is_some()
|
||||
&& !self.timeout.is_zero()
|
||||
&& (Instant::now() - data.start_time > self.timeout)
|
||||
{
|
||||
// Abandon this thread by dissociating its token from this [`Debounce`] instance.
|
||||
data.active_token = None;
|
||||
}
|
||||
if data.active_token.is_none() {
|
||||
// We need to spawn a new thread. Mark the current time so that a new request won't
|
||||
// immediately abandon us and start a new thread too.
|
||||
spawn = true;
|
||||
data.active_token = Some(data.next_token);
|
||||
data.next_token = data.next_token.checked_add(1).unwrap();
|
||||
data.start_time = Instant::now();
|
||||
}
|
||||
data.active_token.expect("Something should be active now.")
|
||||
};
|
||||
|
||||
// Spawn after unlocking the mutex above.
|
||||
if spawn {
|
||||
// We need to clone the Arc to get it to last for the duration of the 'static lifetime.
|
||||
let debounce = self.clone();
|
||||
iothread_perform(move || {
|
||||
while debounce.run_next(active_token) {
|
||||
// Keep thread alive/busy.
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
active_token
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::{Debounce, iothread_drain_all, iothread_service_main, spawn, thread_id};
|
||||
use crate::global_safety::RelaxedAtomicBool;
|
||||
use crate::reader::{Reader, fake_scoped_reader};
|
||||
use crate::tests::prelude::*;
|
||||
use super::{spawn, thread_id};
|
||||
|
||||
use std::mem::MaybeUninit;
|
||||
use std::sync::{
|
||||
Arc, Condvar, Mutex,
|
||||
atomic::{AtomicI32, AtomicU32, Ordering},
|
||||
atomic::{AtomicI32, Ordering},
|
||||
};
|
||||
use std::time::Duration;
|
||||
|
||||
@@ -794,125 +516,4 @@ struct Context {
|
||||
));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_debounce() {
|
||||
let _cleanup = test_init();
|
||||
let parser = TestParser::new();
|
||||
// Run 8 functions using a condition variable.
|
||||
// Only the first and last should run.
|
||||
let db = Debounce::new(Duration::from_secs(0));
|
||||
const count: usize = 8;
|
||||
|
||||
struct Context {
|
||||
handler_ran: [RelaxedAtomicBool; count],
|
||||
completion_ran: [RelaxedAtomicBool; count],
|
||||
ready_to_go: Mutex<bool>,
|
||||
cv: Condvar,
|
||||
}
|
||||
|
||||
let ctx = Arc::new(Context {
|
||||
handler_ran: std::array::from_fn(|_i| RelaxedAtomicBool::new(false)),
|
||||
completion_ran: std::array::from_fn(|_i| RelaxedAtomicBool::new(false)),
|
||||
ready_to_go: Mutex::new(false),
|
||||
cv: Condvar::new(),
|
||||
});
|
||||
|
||||
// "Enqueue" all functions. Each one waits until ready_to_go.
|
||||
for idx in 0..count {
|
||||
assert!(!ctx.handler_ran[idx].load());
|
||||
let performer = {
|
||||
let ctx = ctx.clone();
|
||||
move || {
|
||||
let guard = ctx.ready_to_go.lock().unwrap();
|
||||
let _guard = ctx.cv.wait_while(guard, |ready| !*ready).unwrap();
|
||||
ctx.handler_ran[idx].store(true);
|
||||
idx
|
||||
}
|
||||
};
|
||||
let completer = {
|
||||
let ctx = ctx.clone();
|
||||
move |_ctx: &mut Reader, idx: usize| {
|
||||
ctx.completion_ran[idx].store(true);
|
||||
}
|
||||
};
|
||||
db.perform_with_completion(performer, completer);
|
||||
}
|
||||
|
||||
// We're ready to go.
|
||||
*ctx.ready_to_go.lock().unwrap() = true;
|
||||
ctx.cv.notify_all();
|
||||
|
||||
// Wait until the last completion is done.
|
||||
let mut reader = fake_scoped_reader(&parser);
|
||||
while !ctx.completion_ran.last().unwrap().load() {
|
||||
iothread_service_main(&mut reader);
|
||||
}
|
||||
iothread_drain_all(&mut reader);
|
||||
|
||||
// Each perform() call may displace an existing queued operation.
|
||||
// Each operation waits until all are queued.
|
||||
// Therefore we expect the last perform() to have run, and at most one more.
|
||||
assert!(ctx.handler_ran.last().unwrap().load());
|
||||
assert!(ctx.completion_ran.last().unwrap().load());
|
||||
|
||||
let mut total_ran = 0;
|
||||
for idx in 0..count {
|
||||
if ctx.handler_ran[idx].load() {
|
||||
total_ran += 1;
|
||||
}
|
||||
assert_eq!(ctx.handler_ran[idx].load(), ctx.completion_ran[idx].load());
|
||||
}
|
||||
assert!(total_ran <= 2);
|
||||
}
|
||||
|
||||
#[test]
|
||||
#[serial]
|
||||
fn test_debounce_timeout() {
|
||||
let _cleanup = test_init();
|
||||
// Verify that debounce doesn't wait forever.
|
||||
// Use a shared_ptr so we don't have to join our threads.
|
||||
let timeout = Duration::from_millis(500);
|
||||
|
||||
struct Data {
|
||||
db: Debounce,
|
||||
exit_ok: Mutex<bool>,
|
||||
cv: Condvar,
|
||||
running: AtomicU32,
|
||||
}
|
||||
|
||||
let data = Arc::new(Data {
|
||||
db: Debounce::new(timeout),
|
||||
exit_ok: Mutex::new(false),
|
||||
cv: Condvar::new(),
|
||||
running: AtomicU32::new(0),
|
||||
});
|
||||
|
||||
// Our background handler. Note this just blocks until exit_ok is set.
|
||||
let handler = {
|
||||
let data = data.clone();
|
||||
move || {
|
||||
data.running.fetch_add(1, Ordering::Relaxed);
|
||||
let guard = data.exit_ok.lock().unwrap();
|
||||
let _guard = data.cv.wait_while(guard, |exit_ok| !*exit_ok);
|
||||
}
|
||||
};
|
||||
|
||||
// Spawn the handler twice. This should not modify the thread token.
|
||||
let token1 = data.db.perform(handler.clone());
|
||||
let token2 = data.db.perform(handler.clone());
|
||||
assert_eq!(token1, token2);
|
||||
|
||||
// Wait 75 msec, then enqueue something else; this should spawn a new thread.
|
||||
std::thread::sleep(timeout + timeout / 2);
|
||||
assert!(data.running.load(Ordering::Relaxed) == 1);
|
||||
let token3 = data.db.perform(handler);
|
||||
assert!(token3 > token2);
|
||||
|
||||
// Release all the threads.
|
||||
let mut exit_ok = data.exit_ok.lock().unwrap();
|
||||
*exit_ok = true;
|
||||
data.cv.notify_all();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user