From b7ae159824a586dcf26aae1c5774073592a2eaf3 Mon Sep 17 00:00:00 2001 From: Peter Ammon Date: Sun, 22 Dec 2024 17:13:49 -0800 Subject: [PATCH] Remove the ability for FdMonitorItems to have timeouts FdMonitor is used to monitor a set of file descriptors and invoke a callback when one becomes readable. Prior to this commit, they coudl also have the callback invoked on timeout. fish used to use this feature but no longer does; remove it. --- src/fd_monitor.rs | 78 +++++++---------------------------------- src/io.rs | 2 +- src/tests/fd_monitor.rs | 72 ++++++++++--------------------------- 3 files changed, 32 insertions(+), 120 deletions(-) diff --git a/src/fd_monitor.rs b/src/fd_monitor.rs index bca8de730..abf91168a 100644 --- a/src/fd_monitor.rs +++ b/src/fd_monitor.rs @@ -5,7 +5,7 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::{Arc, Mutex, Weak}; -use std::time::{Duration, Instant}; +use std::time::Duration; use crate::common::exit_without_destructors; use crate::fd_readable_set::FdReadableSet; @@ -26,8 +26,6 @@ pub enum ItemWakeReason { /// The fd became readable (or was HUP'd) Readable, - /// The requested timeout was hit - Timeout, /// The item was "poked" (woken up explicitly) Poke, } @@ -209,11 +207,6 @@ pub struct FdMonitorItem { /// A callback to be invoked when the fd is readable, or for another reason given by the wake reason. /// If the fd is invalid on return from the function, then the item is removed from the [`FdMonitor`] set. callback: Callback, - /// The timeout associated with waiting on this item or `None` to wait indefinitely. A timeout - /// of `0` is not supported. - timeout: Option, - /// The last time we were called or the time of initialization. - last_time: Option, /// The id for this item, assigned by [`FdMonitor`]. item_id: FdMonitorItemId, } @@ -226,35 +219,14 @@ pub enum ItemAction { } impl FdMonitorItem { - /// Return the duration until the timeout should trigger or `None`. A return of `0` means we are - /// at or past the timeout. - fn remaining_time(&self, now: &Instant) -> Option { - let last_time = self.last_time.expect("Should always have a last_time!"); - let timeout = self.timeout?; - assert!(now >= &last_time, "Steady clock went backwards or bug!"); - let since = *now - last_time; - Some(if since >= timeout { - Duration::ZERO - } else { - timeout - since - }) - } - - /// Invoke this item's callback if its value (when its value is set in the fd or has timed out). + /// Invoke this item's callback if its fd is readable. /// Returns `true` if the item should be retained or `false` if it should be removed from the /// set. - fn service_item(&mut self, fds: &FdReadableSet, now: &Instant) -> ItemAction { + fn service_item(&mut self, fds: &FdReadableSet) -> ItemAction { let mut result = ItemAction::Retain; let readable = fds.test(self.fd.as_raw_fd()); - let timed_out = !readable && self.remaining_time(now) == Some(Duration::ZERO); - if readable || timed_out { - self.last_time = Some(*now); - let reason = if readable { - ItemWakeReason::Readable - } else { - ItemWakeReason::Timeout - }; - result = (self.callback)(&mut self.fd, reason); + if readable { + result = (self.callback)(&mut self.fd, ItemWakeReason::Readable); } result } @@ -269,19 +241,17 @@ fn maybe_poke_item(&mut self, pokelist: &[FdMonitorItemId]) -> ItemAction { (self.callback)(&mut self.fd, ItemWakeReason::Poke) } - pub fn new(fd: AutoCloseFd, timeout: Option, callback: Callback) -> Self { + pub fn new(fd: AutoCloseFd, callback: Callback) -> Self { FdMonitorItem { fd, - timeout, callback, item_id: FdMonitorItemId(0), - last_time: None, } } } /// A thread-safe class which can monitor a set of fds, invoking a callback when any becomes -/// readable (or has been HUP'd) or when per-item-configurable timeouts are reached. +/// readable (or has been HUP'd). pub struct FdMonitor { /// Our self-signaller. When this is written to, it means there are new items pending, new items /// in the poke list, or terminate has been set. @@ -330,7 +300,6 @@ impl FdMonitor { /// Add an item to the monitor. Returns the [`FdMonitorItemId`] assigned to the item. pub fn add(&self, mut item: FdMonitorItem) -> FdMonitorItemId { assert!(item.fd.is_valid()); - assert!(item.timeout != Some(Duration::ZERO), "Invalid timeout!"); assert!( item.item_id == FdMonitorItemId(0), "Item should not already have an id!" @@ -423,16 +392,8 @@ fn run(mut self) { let change_signal_fd = self.change_signaller.upgrade().unwrap().read_fd(); fds.add(change_signal_fd); - let mut now = Instant::now(); - // Use Duration::MAX to represent no timeout for comparison purposes. - let mut timeout = Duration::MAX; - for item in &mut self.items { fds.add(item.fd.as_raw_fd()); - if item.last_time.is_none() { - item.last_time = Some(now); - } - timeout = timeout.min(item.timeout.unwrap_or(Duration::MAX)); } // If we have no items, then we wish to allow the thread to exit, but after a time, so @@ -440,18 +401,10 @@ fn run(mut self) { // msec; if nothing becomes readable by then we will exit. We refer to this as the // wait-lap. let is_wait_lap = self.items.is_empty(); - if is_wait_lap { - assert!( - timeout == Duration::MAX, - "Should not have a timeout on wait lap!" - ); - timeout = Duration::from_millis(256); - } - - // Don't leave Duration::MAX as an actual timeout value - let timeout = match timeout { - Duration::MAX => None, - timeout => Some(timeout), + let timeout = if is_wait_lap { + Some(Duration::from_millis(256)) + } else { + None }; // Call select() @@ -465,22 +418,17 @@ fn run(mut self) { perror("select"); } - // Update the value of `now` after waiting on `fds.check_readable()`; it's used in the - // servicer closure. - now = Instant::now(); - // A predicate which services each item in turn, returning true if it should be removed let servicer = |item: &mut FdMonitorItem| { let fd = item.fd.as_raw_fd(); - let action = item.service_item(&fds, &now); + let action = item.service_item(&fds); if action == ItemAction::Remove { FLOG!(fd_monitor, "Removing fd", fd); } action }; - // Service all items that are either readable or have timed out, and remove any which - // say to do so. + // Service all items that are readable, and remove any which say to do so. self.items .retain_mut(|item| servicer(item) == ItemAction::Retain); diff --git a/src/io.rs b/src/io.rs index 45d422346..306304443 100644 --- a/src/io.rs +++ b/src/io.rs @@ -609,7 +609,7 @@ fn begin_filling(iobuffer: &Arc, fd: OwnedFd) { }; let fd = AutoCloseFd::new(fd.into_raw_fd()); - let item_id = fd_monitor().add(FdMonitorItem::new(fd, None, item_callback)); + let item_id = fd_monitor().add(FdMonitorItem::new(fd, item_callback)); iobuffer.item_id.store(u64::from(item_id), Ordering::SeqCst); } diff --git a/src/tests/fd_monitor.rs b/src/tests/fd_monitor.rs index 0df3c40cf..7cfba0953 100644 --- a/src/tests/fd_monitor.rs +++ b/src/tests/fd_monitor.rs @@ -5,7 +5,7 @@ use std::os::fd::{AsRawFd, IntoRawFd}; #[cfg(target_has_atomic = "64")] use std::sync::atomic::AtomicU64; -use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -20,7 +20,6 @@ /// This could be structured differently to avoid the `Mutex` on `writer`, but it's not worth it /// since this is just used for test purposes. struct ItemMaker { - pub did_timeout: AtomicBool, pub length_read: AtomicUsize, pub pokes: AtomicUsize, pub total_calls: AtomicUsize, @@ -30,19 +29,14 @@ struct ItemMaker { } impl ItemMaker { - pub fn insert_new_into(monitor: &FdMonitor, timeout: Option) -> Arc { - Self::insert_new_into2(monitor, timeout, |_| {}) + pub fn insert_new_into(monitor: &FdMonitor) -> Arc { + Self::insert_new_into2(monitor, |_| {}) } - pub fn insert_new_into2( - monitor: &FdMonitor, - timeout: Option, - config: F, - ) -> Arc { + pub fn insert_new_into2(monitor: &FdMonitor, config: F) -> Arc { let pipes = make_autoclose_pipes().expect("fds exhausted!"); let mut result = ItemMaker { - did_timeout: false.into(), length_read: 0.into(), pokes: 0.into(), total_calls: 0.into(), @@ -59,7 +53,7 @@ pub fn insert_new_into2( move |fd: &mut AutoCloseFd, reason: ItemWakeReason| result.callback(fd, reason) }; let fd = AutoCloseFd::new(pipes.read.into_raw_fd()); - let item = FdMonitorItem::new(fd, timeout, Box::new(callback)); + let item = FdMonitorItem::new(fd, Box::new(callback)); let item_id = monitor.add(item); result.item_id.store(u64::from(item_id), Ordering::Relaxed); @@ -74,9 +68,6 @@ fn callback(&self, fd: &mut AutoCloseFd, reason: ItemWakeReason) -> ItemAction { let mut was_closed = false; match reason { - ItemWakeReason::Timeout => { - self.did_timeout.store(true, Ordering::Relaxed); - } ItemWakeReason::Poke => { self.pokes.fetch_add(1, Ordering::Relaxed); } @@ -116,34 +107,24 @@ fn fd_monitor_items() { let _cleanup = test_init(); let monitor = FdMonitor::new(); - // Items which will never receive data or be called. - let item_never = ItemMaker::insert_new_into(&monitor, None); - let item_huge_timeout = - ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(100_000_000))); + // Item which will never receive data or be called. + let item_never = ItemMaker::insert_new_into(&monitor); - // Item which should get no data and time out. - let item0_timeout = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16))); - - // Item which should get exactly 42 bytes then time out. - let item42_timeout = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16))); - - // Item which should get exactly 42 bytes and not time out. - let item42_no_timeout = ItemMaker::insert_new_into(&monitor, None); + // Item which should get exactly 42 bytes. + let item42 = ItemMaker::insert_new_into(&monitor); // Item which should get 42 bytes then get notified it is closed. - let item42_then_close = ItemMaker::insert_new_into(&monitor, Some(Duration::from_millis(16))); + let item42_then_close = ItemMaker::insert_new_into(&monitor); // Item which gets one poke. - let item_pokee = ItemMaker::insert_new_into(&monitor, None); + let item_pokee = ItemMaker::insert_new_into(&monitor); // Item which should get a callback exactly once. - let item_oneshot = - ItemMaker::insert_new_into2(&monitor, Some(Duration::from_millis(16)), |item| { - item.always_exit = true; - }); + let item_oneshot = ItemMaker::insert_new_into2(&monitor, |item| { + item.always_exit = true; + }); - item42_timeout.write42(); - item42_no_timeout.write42(); + item42.write42(); item42_then_close.write42(); *item42_then_close.writer.lock().expect("Mutex poisoned!") = None; item_oneshot.write42(); @@ -153,44 +134,27 @@ fn fd_monitor_items() { // May need to loop here to ensure our fd_monitor gets scheduled. See #7699. for _ in 0..100 { std::thread::sleep(Duration::from_millis(84)); - if item0_timeout.did_timeout.load(Ordering::Relaxed) { + if item_oneshot.total_calls.load(Ordering::Relaxed) > 0 { break; } } drop(monitor); - assert_eq!(item_never.did_timeout.load(Ordering::Relaxed), false); assert_eq!(item_never.length_read.load(Ordering::Relaxed), 0); assert_eq!(item_never.pokes.load(Ordering::Relaxed), 0); - assert_eq!(item_huge_timeout.did_timeout.load(Ordering::Relaxed), false); - assert_eq!(item_huge_timeout.length_read.load(Ordering::Relaxed), 0); - assert_eq!(item_huge_timeout.pokes.load(Ordering::Relaxed), 0); + assert_eq!(item42.length_read.load(Ordering::Relaxed), 42); + assert_eq!(item42.pokes.load(Ordering::Relaxed), 0); - assert_eq!(item0_timeout.length_read.load(Ordering::Relaxed), 0); - assert_eq!(item0_timeout.did_timeout.load(Ordering::Relaxed), true); - assert_eq!(item0_timeout.pokes.load(Ordering::Relaxed), 0); - - assert_eq!(item42_timeout.length_read.load(Ordering::Relaxed), 42); - assert_eq!(item42_timeout.did_timeout.load(Ordering::Relaxed), true); - assert_eq!(item42_timeout.pokes.load(Ordering::Relaxed), 0); - - assert_eq!(item42_no_timeout.length_read.load(Ordering::Relaxed), 42); - assert_eq!(item42_no_timeout.did_timeout.load(Ordering::Relaxed), false); - assert_eq!(item42_no_timeout.pokes.load(Ordering::Relaxed), 0); - - assert_eq!(item42_then_close.did_timeout.load(Ordering::Relaxed), false); assert_eq!(item42_then_close.length_read.load(Ordering::Relaxed), 42); assert_eq!(item42_then_close.total_calls.load(Ordering::Relaxed), 2); assert_eq!(item42_then_close.pokes.load(Ordering::Relaxed), 0); - assert_eq!(item_oneshot.did_timeout.load(Ordering::Relaxed), false); assert_eq!(item_oneshot.length_read.load(Ordering::Relaxed), 42); assert_eq!(item_oneshot.total_calls.load(Ordering::Relaxed), 1); assert_eq!(item_oneshot.pokes.load(Ordering::Relaxed), 0); - assert_eq!(item_pokee.did_timeout.load(Ordering::Relaxed), false); assert_eq!(item_pokee.length_read.load(Ordering::Relaxed), 0); assert_eq!(item_pokee.total_calls.load(Ordering::Relaxed), 1); assert_eq!(item_pokee.pokes.load(Ordering::Relaxed), 1);