From 3cb5a9b8fa1fa328722f4cbce7dd9048ebecb4a8 Mon Sep 17 00:00:00 2001 From: epi Date: Wed, 17 Feb 2021 07:03:04 -0600 Subject: [PATCH] incremental save --- src/scan_manager/scan.rs | 22 ++ src/scanner/container.rs | 2 +- src/scanner/utils.rs | 495 +++++++++++++++++++++------------------ src/statistics/macros.rs | 6 + 4 files changed, 301 insertions(+), 224 deletions(-) diff --git a/src/scan_manager/scan.rs b/src/scan_manager/scan.rs index 418eacc..6145801 100644 --- a/src/scan_manager/scan.rs +++ b/src/scan_manager/scan.rs @@ -13,6 +13,7 @@ use std::{ collections::HashMap, fmt, sync::{Arc, Mutex}, + time::Instant, }; use std::sync::atomic::{AtomicUsize, Ordering}; @@ -60,6 +61,9 @@ pub struct FeroxScan { /// tracker for total number of errors encountered by the FeroxScan instance pub(super) errors: AtomicUsize, + + /// tracker for the time at which this scan was started + pub(super) start_time: Instant } /// Default implementation for FeroxScan @@ -81,6 +85,7 @@ impl Default for FeroxScan { errors: Default::default(), status_429s: Default::default(), status_403s: Default::default(), + start_time: Instant::now(), } } } @@ -282,6 +287,23 @@ impl FeroxScan { fn status_429s(&self) -> usize { self.status_429s.load(Ordering::Relaxed) } + + /// return the number of requests per second performed by this scan's scanner + pub fn requests_per_second(&self) -> u64 { + if !self.is_active() { + return 0; + } + + let reqs = self.requests(); + let seconds = self.start_time.elapsed().as_secs(); + + reqs / seconds + } + + /// return the number of requests performed by this scan's scanner + pub fn requests(&self) -> u64 { + self.progress_bar().position() + } } /// Display implementation diff --git a/src/scanner/container.rs b/src/scanner/container.rs index 6a088ed..0354d7b 100644 --- a/src/scanner/container.rs +++ b/src/scanner/container.rs @@ -120,7 +120,7 @@ impl FeroxScanner { } } - let requester = Arc::new(Requester::from(self)?); + let requester = Arc::new(Requester::from(self, ferox_scan.clone())?); let increment_len = (self.handles.config.extensions.len() + 1) as u64; // producer tasks (mp of mpsc); responsible for making requests diff --git a/src/scanner/utils.rs b/src/scanner/utils.rs index 0057cc8..616e013 100644 --- a/src/scanner/utils.rs +++ b/src/scanner/utils.rs @@ -3,30 +3,27 @@ use crate::{ atomic_load, atomic_store, config::RequesterPolicy, event_handlers::{ - Command::{self, AddError, GetRuntime, SubtractFromUsizeField}, + Command::{self, AddError, SubtractFromUsizeField}, Handles, }, extractor::{ExtractionTarget::ResponseBody, ExtractorBuilder}, response::FeroxResponse, scan_manager::ScanStatus, - statistics::{ - StatError::Other, - StatField::{Enforced403s, Enforced429s, EnforcedErrors, TotalExpected}, - }, + statistics::{StatError::Other, StatField::TotalExpected}, url::FeroxUrl, utils::logged_request, HIGH_ERROR_RATIO, }; use anyhow::Result; -use lazy_static::lazy_static; +use crate::scan_manager::FeroxScan; +use crate::utils::ferox_print; use leaky_bucket::LeakyBucket; use std::{ cmp::max, - ops::Index, sync::{ atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, + Arc, Mutex, }, }; use tokio::{ @@ -35,15 +32,7 @@ use tokio::{ }; /// default number of milliseconds to wait during a cooldown period -const WAIT_TIME: u64 = 1250; - -lazy_static! { - /// todo doc - static ref SHOULD_TUNE: AtomicBool = AtomicBool::new(false); - - /// todo doc - static ref TUNE_TRIGGER: std::sync::Mutex = std::sync::Mutex::new(PolicyTrigger::Errors); -} +const WAIT_TIME: u64 = 3500; #[derive(Copy, Clone, PartialEq, Debug)] /// represents different situations where different criteria can trigger auto-tune/bail behavior @@ -73,6 +62,10 @@ pub struct PolicyData { /// number of errors (at last interval) errors: AtomicUsize, + /// whether or not the owning Requester has started enforcing the policy yet or not + /// (is set to true when the first should_enforce_policy branch is hit + enforcing: AtomicBool, + /// heap of values used for adjusting # of requests/second heap: std::sync::RwLock, } @@ -92,7 +85,7 @@ impl PolicyData { if let Ok(mut guard) = self.heap.write() { guard.original = reqs_sec as i32; guard.build(); - self.set_limit(guard.value() as usize); + self.set_limit(reqs_sec); } } @@ -112,49 +105,85 @@ impl PolicyData { } /// adjust the rate of requests per second up (increase rate) - fn adjust_up(&self) { + fn adjust_up(&self, url: &str, streak_counter: &usize) { // log::error!("enter: adjust up"); // todo remove - if let Ok(mut heap) = self.heap.write() { - if heap.has_children() { + if let Ok(mut heap) = self.heap.try_write() { + if *streak_counter > 2 { + let current = heap.value(); + heap.move_up(); + heap.move_up(); + if current > heap.value() { + // the tree's structure makes it so that sometimes 2 moves up results in a + // value greater than the current node's and other times we need to move 3 up + // to arrive at a greater value + if heap.has_parent() { + heap.move_up(); + } else { + // been here enough that we can try resuming the scan to its original + // speed (no limiting at all) + heap.inner[0] = heap.original; + } + self.set_limit(heap.value() as usize); + log::error!( + "[STREAK UP ({} errors)] {} cur: {} new: {} orig: {}", + atomic_load!(self.errors), + url, + current, + heap.value(), + heap.original, + ); // todo remove + } + } else if heap.has_children() { let old_limit = heap.value(); // todo remove heap.move_left(); self.set_limit(heap.value() as usize); - // log::error!( - // "[UP ({})] current limit: {} new limit: {}", - // atomic_load!(self.errors), - // old_limit, - // heap.value() - // ); // todo remove + log::error!( + "[UP ({} errors)] {} cur: {} new: {} orig: {}", + atomic_load!(self.errors), + url, + old_limit, + heap.value(), + heap.original + ); // todo remove } else { - let old_limit = heap.value(); // todo remove + let current = heap.value(); // todo remove heap.move_up(); + heap.move_up(); + if current > heap.value() { + heap.move_up(); + } self.set_limit(heap.value() as usize); - // log::error!( - // "[UP ({})] current limit: {} new limit: {}", - // atomic_load!(self.errors), - // old_limit, - // heap.value() - // ); // todo remove + log::error!( + "[BOTTOMED-OUT UP ({} errors)] {} cur: {} new: {} orig: {}", + atomic_load!(self.errors), + url, + current, + heap.value(), + heap.original + ); // todo remove } } // log::error!("exit: adjust up"); // todo remove } /// adjust the rate of requests per second down (decrease rate) - fn adjust_down(&self) { + fn adjust_down(&self, target_url: &str, url: &str) { // log::error!("enter: adjust down: {:?}", self); // todo remove - if let Ok(mut heap) = self.heap.write() { + if let Ok(mut heap) = self.heap.try_write() { if heap.has_children() { let old_limit = heap.value(); // todo remove heap.move_right(); self.set_limit(heap.value() as usize); - // log::error!( - // "[DOWN ({})] current limit: {} new limit: {}", - // atomic_load!(self.errors), - // old_limit, - // heap.value() - // ); // todo remove + log::error!( + "[DOWN ({} errors)] {} ({}) cur: {} new: {} orig: {}", + atomic_load!(self.errors), + target_url, + url, + old_limit, + heap.value(), + heap.original + ); // todo remove } } // log::error!("exit: adjust down"); // todo remove @@ -333,12 +362,22 @@ pub(super) struct Requester { /// data regarding policy and metadata about last enforced trigger etc... policy_data: PolicyData, + + /// FeroxScan associated with the creation of this Requester + ferox_scan: Arc, + + /// simple lock to control access to tuning to a single thread (per-scan) + /// + /// need a usize to determine the number of consecutive non-error calls that a requester has + /// seen; this will satisfy the non-mut self constraint (due to us being behind an Arc, and + /// the need for a counter + tuning_lock: Mutex, } /// Requester implementation impl Requester { /// given a FeroxScanner, create a Requester - pub fn from(scanner: &FeroxScanner) -> Result { + pub fn from(scanner: &FeroxScanner, ferox_scan: Arc) -> Result { let limit = scanner.handles.config.rate_limit; let rate_limiter = if limit > 0 { @@ -350,10 +389,12 @@ impl Requester { let policy_data = PolicyData::new(scanner.handles.config.requester_policy); Ok(Self { + ferox_scan, policy_data, rate_limiter: RwLock::new(rate_limiter), handles: scanner.handles.clone(), target_url: scanner.target_url.to_owned(), + tuning_lock: Mutex::new(0), }) } @@ -373,27 +414,21 @@ impl Requester { /// query the statistics handler in order to get the (current) number of requests/second async fn get_reqs_sec(&self) -> Result { - let reqs = atomic_load!(self.handles.stats.data.requests) as f64; - - let (tx, rx) = oneshot::channel::(); - self.handles.stats.send(GetRuntime(tx))?; - let secs = rx.await?; - - Ok(reqs / secs) + Ok(self.ferox_scan.requests_per_second() as f64) // todo arguable that this needs to be a fn } /// sleep and set a flag that can be checked by other threads async fn cool_down(&self, wait_time: u64) { - if atomic_load!(self.policy_data.cooling_down) { + if atomic_load!(self.policy_data.cooling_down, Ordering::SeqCst) { // prevents a few racy threads making it in here and doubling the wait time erroneously return; } - atomic_store!(self.policy_data.cooling_down, true); + atomic_store!(self.policy_data.cooling_down, true, Ordering::SeqCst); sleep(Duration::from_millis(wait_time)).await; - atomic_store!(self.policy_data.cooling_down, false); + atomic_store!(self.policy_data.cooling_down, false, Ordering::SeqCst); } /// limit the number of requests per second @@ -408,18 +443,34 @@ impl Requester { Ok(()) } - /// simple wrapper to update the appropriate usize field on the Stats object - fn update_error_field(&self, num_errors: usize, trigger: PolicyTrigger) { - let field = match trigger { - PolicyTrigger::Status403 => Enforced403s, - PolicyTrigger::Status429 => Enforced429s, - PolicyTrigger::Errors => EnforcedErrors, + /// small function to break out different error checking mechanisms + fn too_many_errors(&self) -> bool { + let total = self.ferox_scan.num_errors(PolicyTrigger::Errors); + + let (threads, tmax) = match self.policy_data.policy { + // RequesterPolicy::AutoBail => (self.handles.config.threads / 2, 25), + _ => (self.handles.config.threads / 2, 25), + // _ => (self.handles.config.threads, 50), // todo if unused, remove lint / update comment below }; - self.handles - .stats - .data - .update_usize_field(field, num_errors); + // at least 50 errors + let threshold = max(threads, tmax); + + total >= threshold + } + + /// small function to break out different error checking mechanisms + fn too_many_status_errors(&self, trigger: PolicyTrigger) -> bool { + let total = self.ferox_scan.num_errors(trigger); + let requests = self.ferox_scan.requests(); + + let ratio = total as f64 / requests as f64; + + match trigger { + PolicyTrigger::Status403 => ratio >= HIGH_ERROR_RATIO, + PolicyTrigger::Status429 => ratio >= HIGH_ERROR_RATIO / 3.0, + _ => false, + } } /// determine whether or not a policy needs to be enforced @@ -429,7 +480,7 @@ impl Requester { /// - 90% of requests are 403 /// - 30% of requests are 429 fn should_enforce_policy(&self) -> Option { - if atomic_load!(self.policy_data.cooling_down) { + if atomic_load!(self.policy_data.cooling_down, Ordering::SeqCst) { // prevents a few racy threads making it in here and doubling the wait time erroneously return None; } @@ -442,40 +493,15 @@ impl Requester { return None; } - let total_errors = self.handles.stats.data.errors(); - let enforced_errors = self.handles.stats.data.enforced_errors(); - - let unenforced_errors = total_errors.saturating_sub(enforced_errors); - - // at least 50 errors - let threshold = max(self.handles.config.threads, 50); - - if unenforced_errors >= threshold { - self.update_error_field(unenforced_errors, PolicyTrigger::Errors); + if self.too_many_errors() { return Some(PolicyTrigger::Errors); } - let total_403s = self.handles.stats.data.status_403s(); - let enforced_403s = self.handles.stats.data.enforced_403s(); - - let unenforced_403s = total_403s.saturating_sub(enforced_403s); - - let ratio_403s = unenforced_403s as f64 / requests as f64; - if ratio_403s >= HIGH_ERROR_RATIO { - // almost exclusively 403 - self.update_error_field(unenforced_403s, PolicyTrigger::Status403); + if self.too_many_status_errors(PolicyTrigger::Status403) { return Some(PolicyTrigger::Status403); } - let total_429s = self.handles.stats.data.status_429s(); - let enforced_429s = self.handles.stats.data.enforced_429s(); - - let unenforced_429s = total_429s.saturating_sub(enforced_429s); - - let ratio_429s = unenforced_429s as f64 / requests as f64; - if ratio_429s >= HIGH_ERROR_RATIO / 3.0 { - // high # of 429 responses - self.update_error_field(unenforced_429s, PolicyTrigger::Status429); + if self.too_many_status_errors(PolicyTrigger::Status429) { return Some(PolicyTrigger::Status429); } @@ -484,155 +510,167 @@ impl Requester { /// query the statistics handler for the current number of errors based on the given policy async fn get_errors_by_policy(&self, trigger: PolicyTrigger) -> Result { - match (self.policy_data.policy, trigger) { - (RequesterPolicy::AutoBail, PolicyTrigger::Status403) => { - Ok(self.handles.stats.data.status_403s()) - } - (RequesterPolicy::AutoBail, PolicyTrigger::Status429) => { - Ok(self.handles.stats.data.status_429s()) - } - (RequesterPolicy::AutoBail, PolicyTrigger::Errors) => { - Ok(self.handles.stats.data.errors()) - } - (RequesterPolicy::AutoTune, _) => { - // todo unwrap etc - let errors = self - .handles - .ferox_scans() - .unwrap() - .get_scan_by_url(&self.target_url) - .unwrap() - .num_errors(trigger); - Ok(errors) - } - // (RequesterPolicy::AutoTune, PolicyTrigger::Status429) => { - // // todo unwrap etc - // let errors = self.handles.ferox_scans().unwrap().get_scan_by_url(&self.target_url).unwrap().num_errors(trigger); - // Ok(errors) - // } - // (RequesterPolicy::AutoTune, PolicyTrigger::Errors) => { - // // todo unwrap etc - // let errors = self.handles.ferox_scans().unwrap().get_scan_by_url(&self.target_url).unwrap().num_errors(trigger); - // Ok(errors) - // } - // todo autotune error checking isn't quite right, it's checking overall errors then whether - // or not its personal errors are > or ==, which often leads to over-adjusting down (probably) - (RequesterPolicy::Default, _) => Ok(0), - } + Ok(self.ferox_scan.num_errors(trigger)) } /// todo doc - async fn adjust_limit(&self, trigger: PolicyTrigger) -> Result<()> { + async fn adjust_limit(&self, trigger: PolicyTrigger, url: &str) -> Result<()> { let errors = self.get_errors_by_policy(trigger).await?; - // log::error!("[ADJUST ({})] {}", errors, self.target_url); // todo remove + let policy_errors = atomic_load!(self.policy_data.errors, Ordering::SeqCst); - if errors > atomic_load!(self.policy_data.errors) { - // errors have increased, need to reduce the requests/sec limit - self.policy_data.adjust_down(); - self.policy_data.set_errors(errors); - } else { - // errors can only be incremented, so an else is sufficient - self.policy_data.adjust_up(); + if let Ok(mut guard) = self.tuning_lock.try_lock() { + if errors > policy_errors { + // errors have increased, need to reduce the requests/sec limit + *guard = 0; // reset streak counter to 0 + if atomic_load!(self.policy_data.errors) == 0 { + log::error!( + "[ADJUSTLIMIT FIRST] {} scan errs: {} pol errs: {} limit: {} limiter: {:?} orig: {}", + self.target_url, + errors, + policy_errors, + self.policy_data.get_limit(), + self.rate_limiter, + self.policy_data.heap.read().unwrap().original + ); // todo remove + self.policy_data.set_errors(errors); + } else { + log::error!( + "[ADJUSTLIMIT] {} scan errs: {} pol errs: {} limit: {} limiter: {:?} orig: {}", + self.target_url, + errors, + policy_errors, + self.policy_data.get_limit(), + self.rate_limiter, + self.policy_data.heap.read().unwrap().original + ); // todo remove + + self.policy_data.set_errors(errors); + self.policy_data.adjust_down(&self.target_url, url); // todo remove url + } + } else { + // errors can only be incremented, so an else is sufficient + *guard += 1; + self.policy_data.adjust_up(&self.target_url, &*guard); + } } - - self.set_rate_limiter().await?; - + log::error!( + "[POST-CREATE] {} {:?}", + self.target_url, + self.rate_limiter.read().await + ); + self.set_rate_limiter(0, false).await?; // todo remove params probably + log::error!( + "[POST-SET-NEW] {} {:?}", + self.target_url, + self.rate_limiter.read().await + ); Ok(()) } /// lock the rate limiter and set its value to ta new leaky_bucket - async fn set_rate_limiter(&self) -> Result<()> { - let new_bucket = Self::build_a_bucket(self.policy_data.get_limit())?; + async fn set_rate_limiter(&self, new_limit: usize, initial_tune: bool) -> Result<()> { + let current = self.policy_data.get_limit(); // todo initial_tune not needed, just feed a value that isnt the current limit + + if new_limit == current && !initial_tune { + // this function is called more often than i'd prefer due to Send requirements of + // mutex/rwlock primitives and awaits, this will minimize the cost of the extra calls + return Ok(()); + } + + let new_bucket = Self::build_a_bucket(current)?; // todo rename current if params removed let mut guard = self.rate_limiter.write().await; let _ = std::mem::replace(&mut *guard, Some(new_bucket)); Ok(()) } /// enforce auto-tune policy - async fn tune(&self, trigger: PolicyTrigger) -> Result<()> { - if self.rate_limiter.read().await.is_none() { + async fn tune(&self, trigger: PolicyTrigger, url: &str) -> Result<()> { + // todo 1: first-down and down times are extemely close together, still results in a + // super quick reduction to a quarter of the scan speed instead of a half + // todo 2: check for lock contention, feels like the tune checks aren't happening as often + // as they should be + // let enforcing = atomic_load!(self.policy_data.enforcing, Ordering::SeqCst); + // if !enforcing { + // atomic_store!(self.policy_data.enforcing, true, Ordering::SeqCst); + // atomic_store!(self.policy_data.cooling_down, true, Ordering::SeqCst); + // // set original number of reqs/second the first time tune is called, skip otherwise + // let reqs_sec = self.get_reqs_sec().await? as usize; + // self.policy_data.set_reqs_sec(reqs_sec.clone()); + // + // self.set_rate_limiter(reqs_sec, true).await?; + // let errors = self.get_errors_by_policy(trigger).await?; + // self.policy_data.set_errors(errors); + // + // atomic_store!(self.policy_data.cooling_down, false, Ordering::SeqCst); + // self.cool_down(WAIT_TIME).await; + // log::error!( + // "[FIRST DOWN ({} errors)] {} ({}) cur: {} new: {}", + // errors, + // self.target_url, + // url, + // self.policy_data.get_limit(), + // reqs_sec + // ); // todo remove + // } else { + // // todo try putting this in the same block and only guarding against these two calls? + // let reqs_sec = self.get_reqs_sec().await? as usize; + // self.policy_data.set_reqs_sec(reqs_sec.clone()); + // + // self.adjust_limit(trigger, url).await?; // todo remove url + // + // self.cool_down(WAIT_TIME).await; + // + // // todo consider setting a 'tune' flag that prevents checking should_enforce, the thought + // // being that once it's a yes, it's always a yes and tuning should be a time based thing + // } + if atomic_load!(self.policy_data.errors) == 0 { // set original number of reqs/second the first time tune is called, skip otherwise let reqs_sec = self.get_reqs_sec().await? as usize; - self.policy_data.set_reqs_sec(reqs_sec); - self.set_rate_limiter().await?; + self.policy_data.set_reqs_sec(reqs_sec.clone()); + log::error!("[PRE-CREATE] {:?}", self.rate_limiter.read().await); + self.set_rate_limiter(0, true).await?; // todo remove new_limit and initial from fn call (maybe) } - - self.adjust_limit(trigger).await?; + self.adjust_limit(trigger, url).await?; self.cool_down(WAIT_TIME).await; - // todo consider setting a 'tune' flag that prevents checking should_enforce, the thought - // being that once it's a yes, it's always a yes and tuning should be a time based thing - Ok(()) } /// enforce auto-bail policy async fn bail(&self, trigger: PolicyTrigger) -> Result<()> { - let scans = self.handles.ferox_scans()?; + if self.ferox_scan.is_active() { + log::warn!( + "too many {:?} ({}) triggered {:?} Policy on {}", + trigger, + self.ferox_scan.num_errors(trigger), + self.handles.config.requester_policy, + self.ferox_scan + ); - let mut scan_tuples = vec![]; + // if allowed to be called within .abort, the inner .await makes it so other + // in-flight requests don't see the Cancelled status, doing it here ensures a + // minimum number of requests entering this block + self.ferox_scan + .set_status(ScanStatus::Cancelled) + .unwrap_or_else(|e| log::warn!("Could not set scan status: {}", e)); - if let Ok(guard) = scans.scans.read() { - for (i, scan) in guard.iter().enumerate() { - if scan.is_active() && scan.num_errors(trigger) > 0 { - // only active scans that have at least 1 error - scan_tuples.push((i, scan.num_errors(trigger))); - } - } - } + // kill the scan + self.ferox_scan + .abort() + .await + .unwrap_or_else(|e| log::warn!("Could not bail on scan: {}", e)); - if scan_tuples.is_empty() { - return Ok(()); - } + // figure out how many requests are skipped as a result + let pb = self.ferox_scan.progress_bar(); + let num_skipped = pb.length().saturating_sub(pb.position()) as usize; - // sort by number of errors - scan_tuples.sort_unstable_by(|x, y| y.1.cmp(&x.1)); - - for (idx, _) in scan_tuples { - let scan = if let Ok(guard) = scans.scans.read() { - guard.index(idx).clone() - } else { - log::warn!("Could not acquire the FeroxScans.scans lock"); - continue; - }; - - if scan.is_active() { - log::debug!( - "too many {:?} ({}) triggered {:?} Policy on {}", - trigger, - scan.num_errors(trigger), - self.handles.config.requester_policy, - scan - ); - - // if allowed to be called within .abort, the inner .await makes it so other - // in-flight requests don't see the Cancelled status, doing it here ensures a - // minimum number of requests entering this block - scan.set_status(ScanStatus::Cancelled) - .unwrap_or_else(|e| log::warn!("Could not set scan status: {}", e)); - - // set cooldown flag before awaiting the abort to reduce chance of races - self.cool_down(1500).await; - - // kill the scan - scan.abort() - .await - .unwrap_or_else(|e| log::warn!("Could not bail on scan: {}", e)); - - // figure out how many requests are skipped as a result - let pb = scan.progress_bar(); - let num_skipped = pb.length().saturating_sub(pb.position()) as usize; - - // update the overall scan bar by subtracting the number of skipped requests from - // the total - self.handles - .stats - .send(SubtractFromUsizeField(TotalExpected, num_skipped)) - .unwrap_or_else(|e| log::warn!("Could not update overall scan bar: {}", e)); - - break; - } + // update the overall scan bar by subtracting the number of skipped requests from + // the total + self.handles + .stats + .send(SubtractFromUsizeField(TotalExpected, num_skipped)) + .unwrap_or_else(|e| log::warn!("Could not update overall scan bar: {}", e)); } Ok(()) @@ -648,7 +686,11 @@ impl Requester { FeroxUrl::from_string(&self.target_url, self.handles.clone()).formatted_urls(word)?; for url in urls { - let should_limit = self.rate_limiter.read().await.is_some(); + // auto_tune is true, or rate_limit was set (mutually exclusive to user) + // and a rate_limiter has been created + // short-circuiting the lock access behind the first boolean check + let should_tune = self.handles.config.auto_tune || self.handles.config.rate_limit > 0; + let should_limit = should_tune && self.rate_limiter.read().await.is_some(); if should_limit { // found a rate limiter, limit that junk! @@ -660,20 +702,27 @@ impl Requester { let response = logged_request(&url, self.handles.clone()).await?; - if !atomic_load!(self.policy_data.cooling_down) { - // only check for policy enforcement when the trigger isn't on cooldown + if (should_tune || self.handles.config.auto_bail) + && !atomic_load!(self.policy_data.cooling_down, Ordering::SeqCst) + { + // only check for policy enforcement when the trigger isn't on cooldown and tuning + // or bailing is in place (should_tune used here because when auto-tune is on, we'll + // reach this without a rate_limiter in place) match self.policy_data.policy { RequesterPolicy::AutoTune => { // todo check for tune flag and short-circuit the enforce call - if atomic_load!(SHOULD_TUNE) { - let trigger = *TUNE_TRIGGER.lock().unwrap(); - self.tune(trigger).await?; // todo may or may not be right to bubble up - } else if let Some(trigger) = self.should_enforce_policy() { - if let Ok(mut guard) = TUNE_TRIGGER.lock() { - *guard = trigger; - } - atomic_store!(SHOULD_TUNE, true); - self.tune(trigger).await?; // todo may or may not be right to bubble up + // if atomic_load!(SHOULD_TUNE) { + // let trigger = *TUNE_TRIGGER.lock().unwrap(); + // self.tune(trigger).await?; // todo may or may not be right to bubble up + // } else if let Some(trigger) = self.should_enforce_policy() { + // if let Ok(mut guard) = TUNE_TRIGGER.lock() { + // *guard = trigger; + // } + // atomic_store!(SHOULD_TUNE, true); + // self.tune(trigger).await?; // todo may or may not be right to bubble up + // } + if let Some(trigger) = self.should_enforce_policy() { + self.tune(trigger, &url.to_string()).await?; // todo bubble up? / remove url } } RequesterPolicy::AutoBail => { @@ -684,7 +733,7 @@ impl Requester { RequesterPolicy::Default => {} } } - // todo requests/second on scan bar aren't showing + // response came back without error, convert it to FeroxResponse let ferox_response = FeroxResponse::from(response, true, self.handles.config.output_level).await; diff --git a/src/statistics/macros.rs b/src/statistics/macros.rs index 0dc0f1a..088ad01 100644 --- a/src/statistics/macros.rs +++ b/src/statistics/macros.rs @@ -20,6 +20,9 @@ macro_rules! atomic_load { ($metric:expr) => { $metric.load(Ordering::Relaxed); }; + ($metric:expr, $ordering:expr) => { + $metric.load($ordering); + }; } /// Wrapper around `Atomic*.store` to save me from writing Ordering::Relaxed a bajillion times @@ -28,4 +31,7 @@ macro_rules! atomic_store { ($metric:expr, $value:expr) => { $metric.store($value, Ordering::Relaxed); }; + ($metric:expr, $value:expr, $ordering:expr) => { + $metric.store($value, $ordering); + }; }