Compare commits

...

20 Commits

Author SHA1 Message Date
copilot-swe-agent[bot]
9f6da2abfc Reduce errors array size from 4 to 3 and make TryAdjustUp unreachable in as_index()
Co-authored-by: epi052 <43392618+epi052@users.noreply.github.com>
2025-11-16 22:11:49 +00:00
copilot-swe-agent[bot]
b248b2d9b9 Initial plan 2025-11-16 22:04:38 +00:00
epi
36a366eb55 fixed a handful of minor correctness issues 2025-11-16 16:25:37 -05:00
epi
c9a7abb8f7 fixed possible deadlock in error path for tune/bail 2025-11-16 09:57:37 -05:00
epi
c597ec2bc1 clippy/fmt 2025-11-16 09:10:10 -05:00
epi
c512669d3a added new test suite for tuning; fixed more tests 2025-11-16 09:05:29 -05:00
epi
100bcbfbc4 fixed more tests 2025-11-16 09:05:01 -05:00
epi
5543fa5d36 fixed req/sec test 2025-11-16 08:17:26 -05:00
epi
38ab434642 touched up a few minor issues in nlp 2025-11-16 08:03:58 -05:00
epi
72ab2d9a58 fixed race condition in progress bar message display; fixed tests 2025-11-16 07:58:34 -05:00
epi
f57087c0f9 updated requester to use new policy data per-trigger errors 2025-11-16 06:46:44 -05:00
epi
d0e2419554 added per-trigger error tracking to policy data 2025-11-16 06:46:12 -05:00
epi
4390ac0500 capped timeout to 30sec; added lock error logging 2025-11-16 06:19:33 -05:00
epi
49c3851a85 added (more) safety/bounds checks to limitheap 2025-11-15 22:27:47 -05:00
epi
0881295234 cleaned up how limitheap is initialized from tune func 2025-11-15 22:20:03 -05:00
epi
e673ae3e76 added new flag releases before returns from should_enforce_policy 2025-11-15 21:56:09 -05:00
epi
cb55880aaa removed minor toctou in should_enforce_policy 2025-11-15 21:53:33 -05:00
epi
45ee292110 removed unnecessary cooldown flag manipulation in cool_down func 2025-11-15 21:51:18 -05:00
epi
a197d1994b ensured limit var is never 0 in build_a_bucket, not just refill 2025-11-15 21:46:47 -05:00
epi
9e0f47acdf fixed requests/sec for small values 2025-11-15 21:41:33 -05:00
13 changed files with 848 additions and 113 deletions

View File

@@ -157,20 +157,17 @@ impl Handles {
multiplier * num_words
}
/// number of extensions plus the number of request method types plus any dynamically collected
/// extensions
/// estimate of HTTP requests per word = (base + static extensions + collected extensions)
/// multiplied by the number of request methods
pub fn expected_num_requests_multiplier(&self) -> usize {
let mut multiplier = self.config.extensions.len().max(1);
let methods = self.config.methods.len().max(1);
let base_requests = 1; // the bare word (with optional slash)
let static_extensions = self.config.extensions.len();
let dynamic_extensions = self.num_collected_extensions();
if multiplier > 1 {
// when we have more than one extension, we need to account for the fact that we'll
// be making a request for each extension and the base word (e.g. /foo.html and /foo)
multiplier += 1;
}
let total_paths = base_requests + static_extensions + dynamic_extensions;
multiplier *= self.config.methods.len().max(1) * self.num_collected_extensions().max(1);
multiplier
total_paths * methods
}
/// Helper to easily get the (locked) underlying FeroxScans object

View File

@@ -20,11 +20,10 @@ impl Document {
let processed = preprocess(text);
document.number_of_terms += processed.len();
for normalized in processed {
if normalized.len() >= 2 {
document.add_term(&normalized)
document.add_term(&normalized);
document.number_of_terms += 1;
}
}
document

View File

@@ -73,7 +73,11 @@ impl TfIdf {
to_add.push(score);
}
let average: f32 = to_add.iter().sum::<f32>() / to_add.len() as f32;
let average = if to_add.is_empty() {
0.0
} else {
to_add.iter().sum::<f32>() / to_add.len() as f32
};
*metadata.tf_idf_score_mut() = average;
}

View File

@@ -22,6 +22,15 @@ impl Term {
}
/// metadata to be associated with a `Term`
///
/// # Design Note
///
/// The `count` field represents the number of times a term appeared in a **single document**
/// and is only meaningful in the per-document context (i.e., within a `Document`).
///
/// When `TermMetaData` is stored in the global `TfIdf` model, the `count` field becomes stale
/// and is not used. Instead, the model relies on `term_frequencies` (which tracks the term
/// frequency for each document the term appears in) and calculates TF-IDF scores from those.
#[derive(Debug, Clone, Default)]
pub(super) struct TermMetaData {
/// number of times the associated `Term` was seen in a single document

View File

@@ -86,7 +86,7 @@ pub struct FeroxScan {
pub(super) errors: AtomicUsize,
/// tracker for the time at which this scan was started
pub(super) start_time: Instant,
pub(super) start_time: Mutex<Instant>,
/// whether the progress bar is currently visible or hidden
pub(super) visible: AtomicBool,
@@ -117,7 +117,7 @@ impl Default for FeroxScan {
errors: Default::default(),
status_429s: Default::default(),
status_403s: Default::default(),
start_time: Instant::now(),
start_time: Mutex::new(Instant::now()),
visible: AtomicBool::new(true),
}
}
@@ -210,6 +210,14 @@ impl FeroxScan {
Ok(())
}
/// small wrapper to set `start_time`
pub fn set_start_time(&self, start_time: Instant) -> Result<()> {
if let Ok(mut guard) = self.start_time.lock() {
let _ = std::mem::replace(&mut *guard, start_time);
}
Ok(())
}
/// Simple helper to call .finish on the scan's progress bar
pub(super) fn stop_progress_bar(&self, active_bars: usize) {
if let Ok(guard) = self.progress_bar.lock() {
@@ -428,9 +436,24 @@ impl FeroxScan {
}
let reqs = self.requests();
let seconds = self.start_time.elapsed().as_secs();
let seconds = if let Ok(guard) = self.start_time.lock() {
guard.elapsed().as_secs_f64()
} else {
log::warn!("Could not acquire lock to read start_time for requests_per_second calculation on scan: {self:?}");
0.0
};
reqs.checked_div(seconds).unwrap_or(0)
if seconds == 0.0 || !seconds.is_finite() {
return 0;
}
let rate = reqs as f64 / seconds;
if rate > u64::MAX as f64 {
u64::MAX
} else {
rate as u64
}
}
/// return the number of requests performed by this scan's scanner
@@ -646,11 +669,11 @@ mod tests {
status: Mutex::new(ScanStatus::Running),
task: Default::default(),
progress_bar: Mutex::new(None),
output_level: Default::default(),
output_level: OutputLevel::Silent,
status_403s: Default::default(),
status_429s: Default::default(),
errors: Default::default(),
start_time: Instant::now(),
start_time: Mutex::new(Instant::now()),
handles: None,
};
@@ -661,7 +684,13 @@ mod tests {
let req_sec = scan.requests_per_second();
assert_eq!(req_sec, 100);
// allow for timing imprecision: sleep overhead makes elapsed time slightly > 1 second
// e.g., 100 reqs / 1.01s = 99 req/s
assert!(
(99..=101).contains(&req_sec),
"Expected ~100 req/s, got {}",
req_sec
);
scan.finish(0).unwrap();
assert_eq!(scan.requests_per_second(), 0);

View File

@@ -617,7 +617,7 @@ fn feroxscan_display() {
num_requests: 0,
requests_made_so_far: 0,
visible: AtomicBool::new(true),
start_time: Instant::now(),
start_time: std::sync::Mutex::new(Instant::now()),
output_level: OutputLevel::Default,
status_403s: Default::default(),
status_429s: Default::default(),
@@ -663,7 +663,7 @@ async fn ferox_scan_abort() {
scan_type: Default::default(),
num_requests: 0,
requests_made_so_far: 0,
start_time: Instant::now(),
start_time: std::sync::Mutex::new(Instant::now()),
output_level: OutputLevel::Default,
visible: AtomicBool::new(true),
status_403s: Default::default(),

View File

@@ -256,6 +256,7 @@ impl FeroxScanner {
ferox_scan.set_status(ScanStatus::Waiting)?;
let _permit = self.scan_limiter.acquire().await;
ferox_scan.set_status(ScanStatus::Running)?;
ferox_scan.set_start_time(Instant::now())?;
if self.handles.config.scan_limit > 0 {
scan_timer = Instant::now();

View File

@@ -1,3 +1,4 @@
use std::cmp::max;
use std::fmt::{Debug, Formatter, Result};
/// bespoke variation on an array-backed max-heap
@@ -51,7 +52,18 @@ impl LimitHeap {
pub(super) fn move_right(&mut self) -> usize {
if self.has_children() {
let tmp = self.current;
self.current = self.current * 2 + 2;
let new_index = self.current * 2 + 2;
// bounds check to prevent overflow
if new_index < self.inner.len() {
self.current = new_index;
} else {
log::warn!(
"Heap navigation out of bounds: move_right from {} would go to {}",
tmp,
new_index
);
}
return tmp;
}
self.current
@@ -61,7 +73,18 @@ impl LimitHeap {
pub(super) fn move_left(&mut self) -> usize {
if self.has_children() {
let tmp = self.current;
self.current = self.current * 2 + 1;
let new_index = self.current * 2 + 1;
// Bounds check to prevent overflow
if new_index < self.inner.len() {
self.current = new_index;
} else {
log::warn!(
"Heap navigation out of bounds: move_left from {} would go to {}",
tmp,
new_index
);
}
return tmp;
}
self.current
@@ -79,17 +102,42 @@ impl LimitHeap {
/// move directly to the given index
pub(super) fn move_to(&mut self, index: usize) {
self.current = index;
if index < self.inner.len() {
self.current = index;
} else {
log::warn!(
"Heap navigation out of bounds: move_to({}) exceeds array length {}",
index,
self.inner.len()
);
}
}
/// get the current node's value
pub(super) fn value(&self) -> i32 {
self.inner[self.current]
if self.current < self.inner.len() {
self.inner[self.current]
} else {
log::error!(
"Heap index out of bounds in value(): current={}, len={}",
self.current,
self.inner.len()
);
0 // Return safe default
}
}
/// set the current node's value
pub(super) fn set_value(&mut self, value: i32) {
self.inner[self.current] = value;
if self.current < self.inner.len() {
self.inner[self.current] = value;
} else {
log::error!(
"Heap index out of bounds in set_value(): current={}, len={}",
self.current,
self.inner.len()
);
}
}
/// check that this node has a parent (true for all except root)
@@ -150,11 +198,15 @@ impl LimitHeap {
// arr[0] == 200
// arr[1] (left child) == 300
// arr[2] (right child) == 100
let root = self.original / 2;
// safety: ensure original is at least 2 so root = original/2 >= 1
// this prevents heap from producing limit=0 which would panic in rate limiter
let original = max(self.original, 2);
let root = original / 2;
self.inner[0] = root; // set root node to half of the original value
self.inner[1] = ((self.original - root).abs() / 2) + root;
self.inner[2] = root - ((self.original - root).abs() / 2);
self.inner[1] = ((original - root).abs() / 2) + root;
self.inner[2] = root - ((original - root).abs() / 2);
// start with index 1 and fill in each child below that node
for i in 1..self.inner.len() {

View File

@@ -2,7 +2,7 @@ use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use crate::{atomic_load, atomic_store, config::RequesterPolicy};
use super::limit_heap::LimitHeap;
use super::{limit_heap::LimitHeap, PolicyTrigger};
/// data regarding policy and metadata about last enforced trigger etc...
#[derive(Default, Debug)]
@@ -19,8 +19,11 @@ pub struct PolicyData {
/// rate limit (at last interval)
limit: AtomicUsize,
/// whether the heap has been initialized
pub(super) heap_initialized: AtomicBool,
/// number of errors (at last interval)
pub(super) errors: AtomicUsize,
pub(super) errors: [AtomicUsize; 3],
/// whether or not the owning Requester should remove the rate_limiter, happens when a scan
/// has been limited and moves back up to the point of its original scan speed
@@ -35,7 +38,10 @@ impl PolicyData {
/// given a RequesterPolicy, create a new PolicyData
pub fn new(policy: RequesterPolicy, timeout: u64) -> Self {
// can use this as a tweak for how aggressively adjustments should be made when tuning
// cap at 30 seconds to prevent unbounded waits (e.g., with timeout=100000)
const MAX_WAIT_TIME_MS: u64 = 30_000;
let wait_time = ((timeout as f64 / 2.0) * 1000.0) as u64;
let wait_time = wait_time.min(MAX_WAIT_TIME_MS);
Self {
policy,
@@ -50,12 +56,41 @@ impl PolicyData {
guard.original = reqs_sec as i32;
guard.build();
self.set_limit(guard.inner[0] as usize); // set limit to 1/2 of current request rate
self.heap_initialized.store(true, Ordering::Release);
} else {
log::warn!("Could not acquire heap write lock in set_reqs_sec; heap not initialized");
}
}
/// setter for errors
pub(super) fn set_errors(&self, errors: usize) {
atomic_store!(self.errors, errors);
/// setter for errors (trigger-specific)
pub(super) fn set_errors(&self, trigger: PolicyTrigger, errors: usize) {
if trigger == PolicyTrigger::TryAdjustUp {
return;
}
atomic_store!(self.errors[trigger.as_index()], errors);
}
/// getter for errors (trigger-specific)
pub(super) fn get_errors(&self, trigger: PolicyTrigger) -> usize {
if trigger == PolicyTrigger::TryAdjustUp {
return 0;
}
atomic_load!(self.errors[trigger.as_index()])
}
/// status of heap initialization
pub(super) fn heap_initialized(&self) -> bool {
atomic_load!(self.heap_initialized, Ordering::Acquire)
}
/// reset the heap and initialization flag, called when auto-tune is being disabled
pub(super) fn reset_heap(&self) {
if let Ok(mut guard) = self.heap.write() {
*guard = LimitHeap::default();
self.heap_initialized.store(false, Ordering::Release);
} else {
log::warn!("Could not acquire heap write lock in reset_heap");
}
}
/// setter for limit
@@ -106,6 +141,8 @@ impl PolicyData {
atomic_store!(self.remove_limit, true);
}
self.set_limit(heap.value() as usize);
} else {
log::debug!("Could not acquire heap write lock in adjust_up; rate limit unchanged");
}
}
@@ -116,6 +153,8 @@ impl PolicyData {
heap.move_right();
self.set_limit(heap.value() as usize);
}
} else {
log::debug!("Could not acquire heap write lock in adjust_down; rate limit unchanged");
}
}
}
@@ -142,8 +181,12 @@ mod tests {
/// PolicyData setters/getters tests for code coverage / sanity
fn policy_data_getters_and_setters() {
let pd = PolicyData::new(RequesterPolicy::AutoBail, 7);
pd.set_errors(20);
assert_eq!(pd.errors.load(Ordering::Relaxed), 20);
pd.set_errors(PolicyTrigger::Errors, 20);
assert_eq!(pd.get_errors(PolicyTrigger::Errors), 20);
pd.set_errors(PolicyTrigger::Status403, 15);
assert_eq!(pd.get_errors(PolicyTrigger::Status403), 15);
pd.set_errors(PolicyTrigger::Status429, 10);
assert_eq!(pd.get_errors(PolicyTrigger::Status429), 10);
pd.set_limit(200);
assert_eq!(pd.get_limit(), 200);
}

View File

@@ -105,39 +105,41 @@ impl Requester {
/// build a RateLimiter, given a rate limit (as requests per second)
fn build_a_bucket(limit: usize) -> Result<RateLimiter> {
let refill = max((limit as f64 / 10.0).round() as usize, 1); // minimum of 1 per second
// safety: ensure limit is at least 1 to prevent panic from .initial > .max
let limit = max(limit, 1);
// For accurate rate limiting across all integer values (including low rates like 1-14 req/s),
// we use a 1-second interval and refill with exactly `limit` tokens per interval.
// This ensures refill/interval == limit for any value, avoiding the previous bug where
// limits <15 collapsed to 1 req/s due to rounding.
let refill = limit;
let tokens = max((limit as f64 / 2.0).round() as usize, 1);
let interval = if refill == 1 { 1000 } else { 100 }; // 1 second if refill is 1
let interval = 1000; // 1 second interval for all rates
Ok(RateLimiter::builder()
.interval(Duration::from_millis(interval)) // add tokens every 0.1s
.refill(refill) // ex: 100 req/s -> 10 tokens per 0.1s
.initial(tokens) // reduce initial burst, 2 is arbitrary, but felt good
.interval(Duration::from_millis(interval))
.refill(refill)
.initial(tokens) // start with half capacity to reduce initial burst
.max(limit)
.build())
}
/// sleep and set a flag that can be checked by other threads
async fn cool_down(&self) {
if atomic_load!(self.policy_data.cooling_down, Ordering::SeqCst) {
// prevents a few racy threads making it in here and doubling the wait time erroneously
return;
}
atomic_store!(self.policy_data.cooling_down, true, Ordering::SeqCst);
// should_enforce_policy=>tune call chain has already acquired cooling_down flag
// just need to sleep and reset
sleep(Duration::from_millis(self.policy_data.wait_time)).await;
self.ferox_scan.progress_bar().set_message("");
atomic_store!(self.policy_data.cooling_down, false, Ordering::SeqCst);
atomic_store!(self.policy_data.cooling_down, false, Ordering::Release);
}
/// limit the number of requests per second
pub async fn limit(&self) -> Result<()> {
let guard = self.rate_limiter.read().await;
if guard.is_some() {
guard.as_ref().unwrap().acquire_one().await;
if let Some(limiter) = guard.as_ref() {
limiter.acquire_one().await;
}
Ok(())
@@ -174,16 +176,26 @@ impl Requester {
/// - 90% of requests are 403
/// - 30% of requests are 429
fn should_enforce_policy(&self) -> Option<PolicyTrigger> {
if atomic_load!(self.policy_data.cooling_down, Ordering::SeqCst) {
// prevents a few racy threads making it in here and doubling the wait time erroneously
// use compare_exchange to ensure only one thread can proceed with policy enforcement
// this prevents multiple threads from simultaneously deciding to enforce policy
// AcqRel provides necessary synchronization
if self
.policy_data
.cooling_down
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
// Another thread is already enforcing policy or cooling down
return None;
}
let requests = atomic_load!(self.handles.stats.data.requests);
let requests = self.ferox_scan.requests() as usize;
if requests < max(self.handles.config.threads, 50) {
// check whether at least a full round of threads has made requests or 50 (default # of
// threads), whichever is higher
// check whether at least a full round of threads has made requests for this specific
// scan (not globally), or 50 (default # of threads), whichever is higher
// need to reset the flag since we're not actually enforcing
atomic_store!(self.policy_data.cooling_down, false, Ordering::Release);
return None;
}
@@ -199,48 +211,93 @@ impl Requester {
return Some(PolicyTrigger::Status429);
}
// No policy trigger found, reset the flag
atomic_store!(self.policy_data.cooling_down, false, Ordering::Release);
None
}
/// wrapper for adjust_[up,down] functions, checks error levels to determine adjustment direction
async fn adjust_limit(&self, trigger: PolicyTrigger, create_limiter: bool) -> Result<()> {
let scan_errors = self.ferox_scan.num_errors(trigger);
let policy_errors = atomic_load!(self.policy_data.errors, Ordering::SeqCst);
let policy_errors = self.policy_data.get_errors(trigger);
// track if we need to update the progress bar message outside the lock
let pb_message: Option<String>;
// Scope the lock so it's dropped before any async operations
{
// Use blocking lock instead of try_lock to avoid spurious warnings and ensure
// adjustments are properly serialized
let mut guard = match self.tuning_lock.lock() {
Ok(g) => g,
Err(e) => {
log::error!("tuning_lock poisoned in adjust_limit: {}", e);
return Ok(()); // Skip this adjustment
}
};
if let Ok(mut guard) = self.tuning_lock.try_lock() {
if scan_errors > policy_errors {
// errors have increased, need to reduce the requests/sec limit
*guard = 0; // reset streak counter to 0
if atomic_load!(self.policy_data.errors) != 0 {
if policy_errors != 0 {
self.policy_data.adjust_down();
log::info!(
"auto-tune: errors increased; reducing speed to {} reqs/sec for {}",
self.policy_data.get_limit(),
self.target_url
);
let styled_direction = style("reduced").red();
self.ferox_scan
.progress_bar()
.set_message(format!("=> 🚦 {styled_direction} scan speed",));
pb_message = Some(format!(
"=> 🚦 {styled_direction} scan speed ({}/s)",
self.policy_data.get_limit()
));
} else {
pb_message = None;
}
self.policy_data.set_errors(scan_errors);
self.policy_data.set_errors(trigger, scan_errors);
} else {
// errors can only be incremented, so an else is sufficient
*guard += 1;
self.policy_data.adjust_up(&guard);
log::info!(
"auto-tune: errors decreased; increasing speed to {} reqs/sec for {}",
self.policy_data.get_limit(),
self.target_url
);
let styled_direction = style("increased").green();
self.ferox_scan
.progress_bar()
.set_message(format!("=> 🚦 {styled_direction} scan speed",));
pb_message = Some(format!(
"=> 🚦 {styled_direction} scan speed ({}/s)",
self.policy_data.get_limit()
));
}
}
// update progress bar while still holding the lock to prevent races
if let Some(ref msg) = pb_message {
self.ferox_scan.progress_bar().set_message(msg.clone());
}
} // guard is dropped here automatically
if atomic_load!(self.policy_data.remove_limit) {
self.set_rate_limiter(None).await?;
atomic_store!(self.policy_data.remove_limit, false);
self.ferox_scan
.progress_bar()
.set_message("=> 🚦 removed rate limiter 🚀");
// reset the auto-tune state machine so it can be re-triggered if needed
atomic_store!(self.policy_triggered, false, Ordering::Release);
self.policy_data.reset_heap();
// acquire lock just for the progress bar update to prevent races
if let Ok(_guard) = self.tuning_lock.try_lock() {
self.ferox_scan
.progress_bar()
.set_message("=> 🚦 removed rate limiter 🚀");
}
} else if create_limiter {
// create_limiter is really just used for unit testing situations, it's true anytime
// during actual execution
@@ -274,9 +331,26 @@ impl Requester {
/// enforce auto-tune policy
async fn tune(&self, trigger: PolicyTrigger) -> Result<()> {
if atomic_load!(self.policy_data.errors) == 0 {
// set original number of reqs/second the first time tune is called, skip otherwise
if !self.policy_data.heap_initialized() {
// keep attempting to set original number of reqs/second when tune is called
let reqs_sec = self.ferox_scan.requests_per_second() as usize;
// guard against req/sec < 2, which would create heap with root=0 and cause panic
// when building rate limiter (.initial > .max). need at least 2 req/sec for stable
// rate limiting (original/2 = 1, which is minimum viable limit)
if reqs_sec < 2 {
log::debug!("auto-tune: {} reqs/sec is too low; not initializing heap and resetting cooldown period", reqs_sec);
// reset heap and initialization flags since we need the should_enforce_limit->tune
// flow to execute again
self.policy_data.reset_heap();
atomic_store!(self.policy_data.cooling_down, false, Ordering::Release);
atomic_store!(self.policy_triggered, false, Ordering::Release);
return Ok(());
}
// only initialize if we have a valid req/sec value
self.policy_data.set_reqs_sec(reqs_sec);
// set the flag to indicate that we have triggered the rate limiter
@@ -284,6 +358,14 @@ impl Requester {
atomic_store!(self.policy_triggered, true);
let new_limit = self.policy_data.get_limit();
log::info!(
"auto-tune: {} reqs/sec was too fast; enforcing limit {} reqs/sec for {}",
reqs_sec,
new_limit,
self.target_url
);
self.set_rate_limiter(Some(new_limit)).await?;
self.ferox_scan
.progress_bar()
@@ -362,6 +444,13 @@ impl Requester {
for url in urls {
for method in self.handles.config.methods.iter() {
// Check denylist BEFORE consuming rate limit tokens to avoid wasting permits
// on URLs that will be skipped anyway
if should_test_deny && should_deny_url(&url, self.handles.clone())? {
// can't allow a denied url to be requested
continue;
}
// auto_tune is true, or rate_limit was set (mutually exclusive to user)
// and a rate_limiter has been created
// short-circuiting the lock access behind the first boolean check
@@ -377,11 +466,6 @@ impl Requester {
}
}
if should_test_deny && should_deny_url(&url, self.handles.clone())? {
// can't allow a denied url to be requested
continue;
}
let data = if self.handles.config.data.is_empty() {
None
} else {
@@ -392,7 +476,7 @@ impl Requester {
logged_request(&url, method.as_str(), data, self.handles.clone()).await?;
if (should_tune || self.handles.config.auto_bail)
&& !atomic_load!(self.policy_data.cooling_down, Ordering::SeqCst)
&& !atomic_load!(self.policy_data.cooling_down, Ordering::Acquire)
{
// only check for policy enforcement when the trigger isn't on cooldown and tuning
// or bailing is in place (should_tune used here because when auto-tune is on, we'll
@@ -400,15 +484,46 @@ impl Requester {
match self.policy_data.policy {
RequesterPolicy::AutoTune => {
if let Some(trigger) = self.should_enforce_policy() {
self.tune(trigger).await?;
if let Err(e) = self.tune(trigger).await {
// reset cooling_down flag on error to prevent permanent lockout
atomic_store!(
self.policy_data.cooling_down,
false,
Ordering::Release
);
atomic_store!(self.policy_triggered, false, Ordering::Release);
return Err(e);
}
} else if atomic_load!(self.policy_triggered) {
self.adjust_limit(PolicyTrigger::TryAdjustUp, true).await?;
self.cool_down().await;
// Use compare_exchange to ensure only one thread attempts upward adjustment
// at a time, preventing races and duplicate adjustments
if self
.policy_data
.cooling_down
.compare_exchange(
false,
true,
Ordering::AcqRel,
Ordering::Acquire,
)
.is_ok()
{
self.adjust_limit(PolicyTrigger::TryAdjustUp, true).await?;
self.cool_down().await;
}
}
}
RequesterPolicy::AutoBail => {
if let Some(trigger) = self.should_enforce_policy() {
self.bail(trigger).await?;
if let Err(e) = self.bail(trigger).await {
// reset cooling_down flag on error to prevent permanent lockout
atomic_store!(
self.policy_data.cooling_down,
false,
Ordering::Release
);
return Err(e);
}
}
}
RequesterPolicy::Default => {}
@@ -599,6 +714,8 @@ mod tests {
for _ in 0..num_errors {
handles.stats.send(AddError(StatError::Other)).unwrap();
scan.add_error();
// Also increment the progress bar to represent a request being made
scan.progress_bar().inc(1);
}
handles.stats.sync().await.unwrap();
@@ -635,6 +752,8 @@ mod tests {
) {
for _ in 0..num_codes {
handles.stats.send(AddStatus(code)).unwrap();
// Also increment the progress bar to represent a request being made
scan.progress_bar().inc(1);
if code == StatusCode::FORBIDDEN {
scan.add_403();
} else {
@@ -933,8 +1052,9 @@ mod tests {
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
/// cooldown should pause execution and prevent others calling it by setting cooling_down flag
async fn cooldown_pauses_and_sets_flag() {
/// cooldown should pause execution for the specified wait_time
/// note: cooling_down flag is now set by should_enforce_policy, not cool_down itself
async fn cooldown_pauses_for_wait_time() {
let (handles, _) = setup_requester_test(None).await;
let requester = Arc::new(Requester {
@@ -949,17 +1069,14 @@ mod tests {
});
let start = Instant::now();
let clone = requester.clone();
let resp = tokio::task::spawn(async move {
sleep(Duration::new(1, 0)).await;
clone.policy_data.cooling_down.load(Ordering::Relaxed)
});
requester.cool_down().await;
assert!(resp.await.unwrap());
println!("{}", start.elapsed().as_millis());
// verify cooldown paused for wait_time (3500ms for timeout=7s)
assert!(start.elapsed().as_millis() >= 3500);
// verify flag was reset to false after cooldown completes
assert!(!requester.policy_data.cooling_down.load(Ordering::Relaxed));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
@@ -1019,7 +1136,7 @@ mod tests {
};
requester.policy_data.set_reqs_sec(400);
requester.policy_data.set_errors(1);
requester.policy_data.set_errors(PolicyTrigger::Errors, 1);
{
let mut guard = requester.tuning_lock.lock().unwrap();
@@ -1033,7 +1150,7 @@ mod tests {
assert_eq!(*requester.tuning_lock.lock().unwrap(), 0);
assert_eq!(requester.policy_data.get_limit(), 100);
assert_eq!(requester.policy_data.errors.load(Ordering::Relaxed), 2);
assert_eq!(requester.policy_data.get_errors(PolicyTrigger::Errors), 2);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
@@ -1182,18 +1299,149 @@ mod tests {
pb.set_position(400);
sleep(Duration::new(1, 0)).await; // used to get req/sec up to 400
assert_eq!(requester.policy_data.errors.load(Ordering::Relaxed), 0);
assert_eq!(
requester.policy_data.get_errors(PolicyTrigger::Status429),
0
);
requester.tune(PolicyTrigger::Status429).await.unwrap();
assert_eq!(requester.policy_data.heap.read().unwrap().original, 400);
assert_eq!(requester.policy_data.get_limit(), 200);
assert_eq!(
requester.rate_limiter.read().await.as_ref().unwrap().max(),
200
let original = requester.policy_data.heap.read().unwrap().original;
// Allow for timing imprecision: 400 reqs / 1.01s elapsed = 399 req/s
assert!(
(399..=401).contains(&original),
"Expected ~400 req/s original, got {}",
original
);
let limit = requester.policy_data.get_limit();
// Limit is original/2, so with original 399-401, limit is 199-200
assert!(
(199..=201).contains(&limit),
"Expected limit ~200, got {}",
limit
);
let rate_limiter_max = requester.rate_limiter.read().await.as_ref().unwrap().max();
assert!(
(199..=201).contains(&rate_limiter_max),
"Expected rate limiter max ~200, got {}",
rate_limiter_max
);
scan.finish(0).unwrap();
assert!(start.elapsed().as_millis() >= 2000);
}
#[test]
/// verify build_a_bucket produces correct rate limits for low values (1-20 req/s)
/// This test validates the fix for Bug #1 where limits < 15 collapsed to 1 req/s
fn build_a_bucket_handles_low_rates_correctly() {
// Test various low rate limits to ensure accurate token bucket configuration
for limit in 1..=20 {
let result = Requester::build_a_bucket(limit);
assert!(result.is_ok(), "build_a_bucket failed for limit {}", limit);
let bucket = result.unwrap();
// With our fix: interval=1000ms, refill=limit
// This ensures refill/interval == limit for accurate rate limiting
assert_eq!(
bucket.max(),
limit,
"Bucket max should equal requested limit {} but got {}",
limit,
bucket.max()
);
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
/// verify that policy_triggered flag is reset when rate limiter is removed
/// This test validates the fix for Bug #2 where auto-tune never disengaged
async fn policy_triggered_reset_when_limiter_removed() {
let (handles, _) = setup_requester_test(None).await;
let ferox_scan = Arc::new(FeroxScan::default());
let requester = Requester {
handles,
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan,
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: PolicyData::new(RequesterPolicy::AutoTune, 7),
policy_triggered: AtomicBool::new(false),
};
// Set policy_triggered to true (as if auto-tune was triggered)
atomic_store!(requester.policy_triggered, true, Ordering::Release);
// Initialize heap to simulate auto-tune being active
requester.policy_data.set_reqs_sec(100);
assert!(requester.policy_data.heap_initialized());
// Simulate the condition where limiter should be removed
atomic_store!(requester.policy_data.remove_limit, true);
// Call adjust_limit which should remove the limiter and reset state
requester
.adjust_limit(PolicyTrigger::Errors, true)
.await
.unwrap();
// Verify policy_triggered was reset
assert!(
!atomic_load!(requester.policy_triggered),
"policy_triggered should be reset to false when limiter is removed"
);
// Verify heap was reset
assert!(
!requester.policy_data.heap_initialized(),
"heap should be reset when limiter is removed"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
/// verify should_enforce_policy uses per-scan request counts, not global
/// This test validates the fix for Bug #4 where global counters caused false positives
async fn should_enforce_policy_uses_per_scan_requests() {
let mut config = Configuration::new().unwrap_or_default();
config.threads = 50;
let (handles, _) = setup_requester_test(Some(Arc::new(config))).await;
let ferox_scan = Arc::new(FeroxScan::default());
let requester = Requester {
handles: handles.clone(),
seen_links: RwLock::new(HashSet::<String>::new()),
tuning_lock: Mutex::new(0),
ferox_scan: ferox_scan.clone(),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: PolicyData::new(RequesterPolicy::AutoTune, 7),
policy_triggered: AtomicBool::new(false),
};
// Add many errors globally (simulating previous scans)
for _ in 0..100 {
handles.stats.send(AddError(StatError::Other)).unwrap();
}
handles.stats.sync().await.unwrap();
// But this scan has only made a few requests
ferox_scan.progress_bar().inc(5);
for _ in 0..5 {
ferox_scan.add_error();
}
// should_enforce_policy should return None because THIS scan hasn't made enough requests
// even though global request count is high
assert_eq!(
requester.should_enforce_policy(),
None,
"should_enforce_policy should use per-scan requests, not global"
);
}
}

View File

@@ -13,3 +13,17 @@ pub enum PolicyTrigger {
/// dummy error for upward rate adjustment
TryAdjustUp,
}
impl PolicyTrigger {
/// get the index into the `PolicyData.errors` array for this trigger
pub fn as_index(&self) -> usize {
match self {
PolicyTrigger::Status403 => 0,
PolicyTrigger::Status429 => 1,
PolicyTrigger::Errors => 2,
PolicyTrigger::TryAdjustUp => {
unreachable!("TryAdjustUp should never be used to access the errors array")
}
}
}
}

View File

@@ -230,11 +230,22 @@ fn auto_tune_slows_scan_with_429s() {
teardown_tmp_directory(tmp_dir);
assert!(normal_reqs_mock.hits() + error_mock.hits() > 25); // must have at least 50 reqs fly
let normal_hits = normal_reqs_mock.hits();
let error_hits = error_mock.hits();
println!("elapsed: {}", start.elapsed().as_millis()); // 3523ms without tuning
assert!(normal_reqs_mock.hits() < 500);
assert!(error_mock.hits() <= 180); // may or may not see all other error requests
println!("normal_reqs_mock.hits(): {}", normal_hits);
println!("error_mock.hits(): {}", error_hits);
assert!(normal_hits + error_hits > 25); // must have at least 50 reqs fly
println!("elapsed: {}", start.elapsed().as_millis());
// With auto-tune and 429s, the scan should be slowed down but may still process
// ~1800-2000 requests in 7 seconds. The key is that it hits the time limit.
assert!(
normal_hits < 3000,
"Should process fewer than 3000 requests due to rate limiting"
);
assert!(error_hits <= 180); // may or may not see all other error requests
assert!(start.elapsed().as_millis() >= 7000); // scan should hit time limit due to limiting
}
@@ -283,11 +294,22 @@ fn auto_tune_slows_scan_with_403s() {
teardown_tmp_directory(tmp_dir);
assert!(normal_reqs_mock.hits() + error_mock.hits() > 25); // must have at least 50 reqs fly
let normal_hits = normal_reqs_mock.hits();
let error_hits = error_mock.hits();
println!("elapsed: {}", start.elapsed().as_millis()); // 3523ms without tuning
assert!(normal_reqs_mock.hits() < 500);
assert!(error_mock.hits() <= 180); // may or may not see all other error requests
println!("normal_reqs_mock.hits(): {}", normal_hits);
println!("error_mock.hits(): {}", error_hits);
assert!(normal_hits + error_hits > 25); // must have at least 50 reqs fly
println!("elapsed: {}", start.elapsed().as_millis());
// With auto-tune and 403s, the scan should be slowed down but may still process
// ~1800-2000 requests in 7 seconds. The key is that it hits the time limit.
assert!(
normal_hits < 3000,
"Should process fewer than 3000 requests due to rate limiting"
);
assert!(error_hits <= 180); // may or may not see all other error requests
assert!(start.elapsed().as_millis() >= 7000); // scan should hit time limit due to limiting
}
@@ -339,8 +361,19 @@ fn auto_tune_slows_scan_with_general_errors() {
teardown_tmp_directory(tmp_dir);
println!("elapsed: {}", start.elapsed().as_millis()); // 3523ms without tuning
assert!(normal_reqs_mock.hits() < 500);
assert!(error_mock.hits() <= 180); // may or may not see all other error requests
let normal_hits = normal_reqs_mock.hits();
let error_hits = error_mock.hits();
println!("normal_reqs_mock.hits(): {}", normal_hits);
println!("error_mock.hits(): {}", error_hits);
println!("elapsed: {}", start.elapsed().as_millis());
// Normal requests timeout (3s delay with 2s timeout), triggering error policy
// The scan should be rate-limited and hit the time limit
assert!(
normal_hits < 3000,
"Should process fewer requests due to rate limiting and timeouts"
);
assert!(error_hits <= 180); // may or may not see all other error requests
assert!(start.elapsed().as_millis() >= 7000); // scan should hit time limit due to limiting
}

View File

@@ -0,0 +1,306 @@
mod utils;
use assert_cmd::Command;
use httpmock::prelude::*;
use httpmock::MockServer;
use regex::Regex;
use std::fs::{read_to_string, write};
use utils::{setup_tmp_directory, teardown_tmp_directory};
/// Helper to create a test wordlist with controllable patterns
fn create_test_wordlist(
normal: usize,
errors: usize,
status403: usize,
status429: usize,
) -> String {
let mut words = Vec::new();
// Normal responses
for i in 0..normal {
words.push(format!("normal_{:06}", i));
}
// Timeout errors
for i in 0..errors {
words.push(format!("error_{:06}", i));
}
// 403 responses
for i in 0..status403 {
words.push(format!("s403_{:06}", i));
}
// 429 responses
for i in 0..status429 {
words.push(format!("s429_{:06}", i));
}
words.join("\n")
}
/// Scenario 1: High 403 rate - tests policy enforcement
#[test]
fn scenario_high_403_rate() {
let srv = MockServer::start();
let (tmp_dir, file) = setup_tmp_directory(&[], "wordlist").unwrap();
let (log_dir, logfile) = setup_tmp_directory(&[], "debug-log").unwrap();
// Create wordlist with high 403 rate
// Need 90%+ ratio and enough requests to trigger policy: 900/(900+100) = 90%
let wordlist = create_test_wordlist(100, 0, 900, 0);
write(&file, wordlist).unwrap();
let _normal_mock = srv.mock(|when, then| {
when.method(GET)
.path_matches(Regex::new("/normal_.*").unwrap());
then.status(200).body("OK");
});
let _forbidden_mock = srv.mock(|when, then| {
when.method(GET)
.path_matches(Regex::new("/s403_.*").unwrap());
then.status(403).body("Forbidden");
});
Command::cargo_bin("feroxbuster")
.unwrap()
.arg("--url")
.arg(srv.url("/"))
.arg("--wordlist")
.arg(file.as_os_str())
.arg("--auto-tune")
.arg("--dont-filter")
.arg("--threads")
.arg("10")
.arg("--debug-log")
.arg(logfile.as_os_str())
.arg("--json")
.arg("-vv")
.assert()
.success();
let debug_log = read_to_string(&logfile).unwrap();
let mut found_403_policy = false;
for line in debug_log.lines() {
if let Ok(log) = serde_json::from_str::<serde_json::Value>(line) {
if let Some(msg) = log.get("message").and_then(|m| m.as_str()) {
if msg.contains("auto-tune:") && msg.contains("enforcing limit") {
found_403_policy = true;
}
}
}
}
teardown_tmp_directory(tmp_dir);
teardown_tmp_directory(log_dir);
assert!(found_403_policy, "High 403 rate should trigger policy");
}
/// Scenario 2: High 429 rate - tests aggressive rate limiting
#[test]
fn scenario_high_429_rate() {
let srv = MockServer::start();
let (tmp_dir, file) = setup_tmp_directory(&[], "wordlist").unwrap();
let (log_dir, logfile) = setup_tmp_directory(&[], "debug-log").unwrap();
// High 429 rate should trigger more aggressive limiting
// Need 30%+ ratio and enough requests: 450/(450+150) = 75%
let wordlist = create_test_wordlist(150, 0, 0, 450);
write(&file, wordlist).unwrap();
let _normal_mock = srv.mock(|when, then| {
when.method(GET)
.path_matches(Regex::new("/normal_.*").unwrap());
then.status(200).body("OK");
});
let _rate_limit_mock = srv.mock(|when, then| {
when.method(GET)
.path_matches(Regex::new("/s429_.*").unwrap());
then.status(429).body("Too Many Requests");
});
Command::cargo_bin("feroxbuster")
.unwrap()
.arg("--url")
.arg(srv.url("/"))
.arg("--wordlist")
.arg(file.as_os_str())
.arg("--auto-tune")
.arg("--dont-filter")
.arg("--threads")
.arg("10")
.arg("--debug-log")
.arg(logfile.as_os_str())
.arg("--json")
.arg("-vv")
.assert()
.success();
let debug_log = read_to_string(&logfile).unwrap();
let mut found_429_policy = false;
for line in debug_log.lines() {
if let Ok(log) = serde_json::from_str::<serde_json::Value>(line) {
if let Some(msg) = log.get("message").and_then(|m| m.as_str()) {
if msg.contains("auto-tune:") && msg.contains("enforcing limit") {
found_429_policy = true;
}
}
}
}
teardown_tmp_directory(tmp_dir);
teardown_tmp_directory(log_dir);
assert!(found_429_policy, "High 429 rate should trigger policy");
}
/// Scenario 3: Recovery pattern - errors then normal
#[test]
fn scenario_recovery_pattern() {
let srv = MockServer::start();
let (tmp_dir, file) = setup_tmp_directory(&[], "wordlist").unwrap();
let (log_dir, logfile) = setup_tmp_directory(&[], "debug-log").unwrap();
// Pattern: errors first, then normal - should slow down then speed up
let mut wordlist = Vec::new();
for i in 0..100 {
wordlist.push(format!("s403_{:04}", i));
}
for i in 0..300 {
wordlist.push(format!("normal_{:04}", i));
}
write(&file, wordlist.join("\n")).unwrap();
let _normal_mock = srv.mock(|when, then| {
when.method(GET)
.path_matches(Regex::new("/normal_.*").unwrap());
then.status(200).body("OK");
});
let _error_mock = srv.mock(|when, then| {
when.method(GET)
.path_matches(Regex::new("/s403_.*").unwrap());
then.status(403).body("Forbidden");
});
Command::cargo_bin("feroxbuster")
.unwrap()
.arg("--url")
.arg(srv.url("/"))
.arg("--wordlist")
.arg(file.as_os_str())
.arg("--auto-tune")
.arg("--dont-filter")
.arg("--threads")
.arg("10")
.arg("--debug-log")
.arg(logfile.as_os_str())
.arg("--json")
.arg("-vv")
.assert()
.success();
let debug_log = read_to_string(&logfile).unwrap();
let mut auto_tune_triggered = false;
for line in debug_log.lines() {
if let Ok(log) = serde_json::from_str::<serde_json::Value>(line) {
if let Some(msg) = log.get("message").and_then(|m| m.as_str()) {
if msg.contains("auto-tune:") && msg.contains("enforcing limit") {
auto_tune_triggered = true;
}
}
}
}
teardown_tmp_directory(tmp_dir);
teardown_tmp_directory(log_dir);
assert!(
auto_tune_triggered,
"Should trigger auto-tune due to errors"
);
}
/// Scenario 4: Mixed steady state - balanced errors and normal
#[test]
fn scenario_mixed_steady_state() {
let srv = MockServer::start();
let (tmp_dir, file) = setup_tmp_directory(&[], "wordlist").unwrap();
let (log_dir, logfile) = setup_tmp_directory(&[], "debug-log").unwrap();
// Evenly mixed - not enough to trigger bail, but enough for tuning
// Need 25+ general errors to trigger: 30 >= 25
let wordlist = create_test_wordlist(150, 30, 10, 10);
write(&file, wordlist).unwrap();
let normal_mock = srv.mock(|when, then| {
when.method(GET)
.path_matches(Regex::new("/normal_.*").unwrap());
then.status(200).body("OK");
});
let error_mock = srv.mock(|when, then| {
when.method(GET)
.path_matches(Regex::new("/error_.*").unwrap());
then.status(504).body("Gateway Timeout");
});
let forbidden_mock = srv.mock(|when, then| {
when.method(GET)
.path_matches(Regex::new("/s403_.*").unwrap());
then.status(403).body("Forbidden");
});
let rate_limit_mock = srv.mock(|when, then| {
when.method(GET)
.path_matches(Regex::new("/s429_.*").unwrap());
then.status(429).body("Too Many Requests");
});
Command::cargo_bin("feroxbuster")
.unwrap()
.arg("--url")
.arg(srv.url("/"))
.arg("--wordlist")
.arg(file.as_os_str())
.arg("--auto-tune")
.arg("--threads")
.arg("10")
.arg("--debug-log")
.arg(logfile.as_os_str())
.arg("-vv")
.assert()
.success();
let debug_log = read_to_string(&logfile).unwrap();
let mut _policy_adjustments = 0;
for line in debug_log.lines() {
if let Ok(log) = serde_json::from_str::<serde_json::Value>(line) {
if let Some(msg) = log.get("message").and_then(|m| m.as_str()) {
if msg.contains("scan speed") || msg.contains("set rate limit") {
_policy_adjustments += 1;
}
}
}
}
let total =
normal_mock.hits() + error_mock.hits() + forbidden_mock.hits() + rate_limit_mock.hits();
teardown_tmp_directory(tmp_dir);
teardown_tmp_directory(log_dir);
// With mixed but not extreme errors, should see some adjustments
assert!(total > 100, "Should complete significant portion of scan");
}