Compare commits

...

2 Commits

Author SHA1 Message Date
epi
2485a2acbd fixed multiple bugs in rate limiting code 2025-11-11 21:50:14 -05:00
epi
8ef7519b1a fixed term counting and div by 0 bugs in nlp 2025-11-11 21:41:34 -05:00
5 changed files with 71 additions and 18 deletions

View File

@@ -20,11 +20,10 @@ impl Document {
let processed = preprocess(text);
document.number_of_terms += processed.len();
for normalized in processed {
if normalized.len() >= 2 {
document.add_term(&normalized)
document.add_term(&normalized);
document.number_of_terms += 1;
}
}
document

View File

@@ -73,7 +73,11 @@ impl TfIdf {
to_add.push(score);
}
let average: f32 = to_add.iter().sum::<f32>() / to_add.len() as f32;
let average = if to_add.is_empty() {
0.0
} else {
to_add.iter().sum::<f32>() / to_add.len() as f32
};
*metadata.tf_idf_score_mut() = average;
}

View File

@@ -22,6 +22,15 @@ impl Term {
}
/// metadata to be associated with a `Term`
///
/// # Design Note
///
/// The `count` field represents the number of times a term appeared in a **single document**
/// and is only meaningful in the per-document context (i.e., within a `Document`).
///
/// When `TermMetaData` is stored in the global `TfIdf` model, the `count` field becomes stale
/// and is not used. Instead, the model relies on `term_frequencies` (which tracks the term
/// frequency for each document the term appears in) and calculates TF-IDF scores from those.
#[derive(Debug, Clone, Default)]
pub(super) struct TermMetaData {
/// number of times the associated `Term` was seen in a single document

View File

@@ -35,7 +35,10 @@ impl PolicyData {
/// given a RequesterPolicy, create a new PolicyData
pub fn new(policy: RequesterPolicy, timeout: u64) -> Self {
// can use this as a tweak for how aggressively adjustments should be made when tuning
// cap at 30 seconds to prevent unbounded waits (e.g., with timeout=100000)
const MAX_WAIT_TIME_MS: u64 = 30_000;
let wait_time = ((timeout as f64 / 2.0) * 1000.0) as u64;
let wait_time = wait_time.min(MAX_WAIT_TIME_MS);
Self {
policy,
@@ -50,6 +53,8 @@ impl PolicyData {
guard.original = reqs_sec as i32;
guard.build();
self.set_limit(guard.inner[0] as usize); // set limit to 1/2 of current request rate
} else {
log::warn!("Could not acquire heap write lock in set_reqs_sec; heap not initialized");
}
}
@@ -106,6 +111,8 @@ impl PolicyData {
atomic_store!(self.remove_limit, true);
}
self.set_limit(heap.value() as usize);
} else {
log::debug!("Could not acquire heap write lock in adjust_up; rate limit unchanged");
}
}
@@ -116,6 +123,8 @@ impl PolicyData {
heap.move_right();
self.set_limit(heap.value() as usize);
}
} else {
log::debug!("Could not acquire heap write lock in adjust_down; rate limit unchanged");
}
}
}

View File

@@ -119,25 +119,32 @@ impl Requester {
/// sleep and set a flag that can be checked by other threads
async fn cool_down(&self) {
if atomic_load!(self.policy_data.cooling_down, Ordering::SeqCst) {
// prevents a few racy threads making it in here and doubling the wait time erroneously
// use compare_exchange to atomically check and set the flag
// this prevents the race condition where multiple threads could pass the check
// before any of them sets the flag to true
// AcqRel provides necessary synchronization: Acquire on success, Relaxed on failure
if self
.policy_data
.cooling_down
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
// Another thread is already cooling down
return;
}
atomic_store!(self.policy_data.cooling_down, true, Ordering::SeqCst);
sleep(Duration::from_millis(self.policy_data.wait_time)).await;
self.ferox_scan.progress_bar().set_message("");
atomic_store!(self.policy_data.cooling_down, false, Ordering::SeqCst);
atomic_store!(self.policy_data.cooling_down, false, Ordering::Release);
}
/// limit the number of requests per second
pub async fn limit(&self) -> Result<()> {
let guard = self.rate_limiter.read().await;
if guard.is_some() {
guard.as_ref().unwrap().acquire_one().await;
if let Some(limiter) = guard.as_ref() {
limiter.acquire_one().await;
}
Ok(())
@@ -174,16 +181,26 @@ impl Requester {
/// - 90% of requests are 403
/// - 30% of requests are 429
fn should_enforce_policy(&self) -> Option<PolicyTrigger> {
if atomic_load!(self.policy_data.cooling_down, Ordering::SeqCst) {
// prevents a few racy threads making it in here and doubling the wait time erroneously
// use compare_exchange to ensure only one thread can proceed with policy enforcement
// this prevents multiple threads from simultaneously deciding to enforce policy
// AcqRel provides necessary synchronization
if self
.policy_data
.cooling_down
.compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
.is_err()
{
// Another thread is already enforcing policy or cooling down
return None;
}
let requests = atomic_load!(self.handles.stats.data.requests);
if requests < max(self.handles.config.threads, 50) {
// check whether at least a full round of threads has made requests or 50 (default # of
// threads), whichever is higher
// check whether at least a full round of threads has made requests or 50
// (default # of threads), whichever is higher
// need to reset the flag since we're not actually enforcing
atomic_store!(self.policy_data.cooling_down, false, Ordering::Release);
return None;
}
@@ -199,19 +216,21 @@ impl Requester {
return Some(PolicyTrigger::Status429);
}
// No policy trigger found, reset the flag
atomic_store!(self.policy_data.cooling_down, false, Ordering::Release);
None
}
/// wrapper for adjust_[up,down] functions, checks error levels to determine adjustment direction
async fn adjust_limit(&self, trigger: PolicyTrigger, create_limiter: bool) -> Result<()> {
let scan_errors = self.ferox_scan.num_errors(trigger);
let policy_errors = atomic_load!(self.policy_data.errors, Ordering::SeqCst);
let policy_errors = atomic_load!(self.policy_data.errors, Ordering::Acquire);
if let Ok(mut guard) = self.tuning_lock.try_lock() {
if scan_errors > policy_errors {
// errors have increased, need to reduce the requests/sec limit
*guard = 0; // reset streak counter to 0
if atomic_load!(self.policy_data.errors) != 0 {
if policy_errors != 0 {
self.policy_data.adjust_down();
let styled_direction = style("reduced").red();
@@ -233,6 +252,11 @@ impl Requester {
.progress_bar()
.set_message(format!("=> 🚦 {styled_direction} scan speed",));
}
} else {
log::warn!(
"Could not acquire tuning lock for {}; skipping rate adjustment",
self.target_url
);
}
if atomic_load!(self.policy_data.remove_limit) {
@@ -277,6 +301,14 @@ impl Requester {
if atomic_load!(self.policy_data.errors) == 0 {
// set original number of reqs/second the first time tune is called, skip otherwise
let reqs_sec = self.ferox_scan.requests_per_second() as usize;
// Guard against zero req/sec which would create an invalid heap
// This can happen if the scan just started or if elapsed time is 0
if reqs_sec == 0 {
log::debug!("tune: skipping initial setup due to zero req/sec");
return Ok(());
}
self.policy_data.set_reqs_sec(reqs_sec);
// set the flag to indicate that we have triggered the rate limiter
@@ -392,7 +424,7 @@ impl Requester {
logged_request(&url, method.as_str(), data, self.handles.clone()).await?;
if (should_tune || self.handles.config.auto_bail)
&& !atomic_load!(self.policy_data.cooling_down, Ordering::SeqCst)
&& !atomic_load!(self.policy_data.cooling_down, Ordering::Acquire)
{
// only check for policy enforcement when the trigger isn't on cooldown and tuning
// or bailing is in place (should_tune used here because when auto-tune is on, we'll