mirror of
https://github.com/epi052/feroxbuster.git
synced 2026-05-29 10:31:12 -03:00
incremental save
This commit is contained in:
@@ -13,6 +13,7 @@ use std::{
|
||||
collections::HashMap,
|
||||
fmt,
|
||||
sync::{Arc, Mutex},
|
||||
time::Instant,
|
||||
};
|
||||
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
@@ -60,6 +61,9 @@ pub struct FeroxScan {
|
||||
|
||||
/// tracker for total number of errors encountered by the FeroxScan instance
|
||||
pub(super) errors: AtomicUsize,
|
||||
|
||||
/// tracker for the time at which this scan was started
|
||||
pub(super) start_time: Instant
|
||||
}
|
||||
|
||||
/// Default implementation for FeroxScan
|
||||
@@ -81,6 +85,7 @@ impl Default for FeroxScan {
|
||||
errors: Default::default(),
|
||||
status_429s: Default::default(),
|
||||
status_403s: Default::default(),
|
||||
start_time: Instant::now(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -282,6 +287,23 @@ impl FeroxScan {
|
||||
fn status_429s(&self) -> usize {
|
||||
self.status_429s.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
/// return the number of requests per second performed by this scan's scanner
|
||||
pub fn requests_per_second(&self) -> u64 {
|
||||
if !self.is_active() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
let reqs = self.requests();
|
||||
let seconds = self.start_time.elapsed().as_secs();
|
||||
|
||||
reqs / seconds
|
||||
}
|
||||
|
||||
/// return the number of requests performed by this scan's scanner
|
||||
pub fn requests(&self) -> u64 {
|
||||
self.progress_bar().position()
|
||||
}
|
||||
}
|
||||
|
||||
/// Display implementation
|
||||
|
||||
@@ -120,7 +120,7 @@ impl FeroxScanner {
|
||||
}
|
||||
}
|
||||
|
||||
let requester = Arc::new(Requester::from(self)?);
|
||||
let requester = Arc::new(Requester::from(self, ferox_scan.clone())?);
|
||||
let increment_len = (self.handles.config.extensions.len() + 1) as u64;
|
||||
|
||||
// producer tasks (mp of mpsc); responsible for making requests
|
||||
|
||||
@@ -3,30 +3,27 @@ use crate::{
|
||||
atomic_load, atomic_store,
|
||||
config::RequesterPolicy,
|
||||
event_handlers::{
|
||||
Command::{self, AddError, GetRuntime, SubtractFromUsizeField},
|
||||
Command::{self, AddError, SubtractFromUsizeField},
|
||||
Handles,
|
||||
},
|
||||
extractor::{ExtractionTarget::ResponseBody, ExtractorBuilder},
|
||||
response::FeroxResponse,
|
||||
scan_manager::ScanStatus,
|
||||
statistics::{
|
||||
StatError::Other,
|
||||
StatField::{Enforced403s, Enforced429s, EnforcedErrors, TotalExpected},
|
||||
},
|
||||
statistics::{StatError::Other, StatField::TotalExpected},
|
||||
url::FeroxUrl,
|
||||
utils::logged_request,
|
||||
HIGH_ERROR_RATIO,
|
||||
};
|
||||
use anyhow::Result;
|
||||
|
||||
use lazy_static::lazy_static;
|
||||
use crate::scan_manager::FeroxScan;
|
||||
use crate::utils::ferox_print;
|
||||
use leaky_bucket::LeakyBucket;
|
||||
use std::{
|
||||
cmp::max,
|
||||
ops::Index,
|
||||
sync::{
|
||||
atomic::{AtomicBool, AtomicUsize, Ordering},
|
||||
Arc,
|
||||
Arc, Mutex,
|
||||
},
|
||||
};
|
||||
use tokio::{
|
||||
@@ -35,15 +32,7 @@ use tokio::{
|
||||
};
|
||||
|
||||
/// default number of milliseconds to wait during a cooldown period
|
||||
const WAIT_TIME: u64 = 1250;
|
||||
|
||||
lazy_static! {
|
||||
/// todo doc
|
||||
static ref SHOULD_TUNE: AtomicBool = AtomicBool::new(false);
|
||||
|
||||
/// todo doc
|
||||
static ref TUNE_TRIGGER: std::sync::Mutex<PolicyTrigger> = std::sync::Mutex::new(PolicyTrigger::Errors);
|
||||
}
|
||||
const WAIT_TIME: u64 = 3500;
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Debug)]
|
||||
/// represents different situations where different criteria can trigger auto-tune/bail behavior
|
||||
@@ -73,6 +62,10 @@ pub struct PolicyData {
|
||||
/// number of errors (at last interval)
|
||||
errors: AtomicUsize,
|
||||
|
||||
/// whether or not the owning Requester has started enforcing the policy yet or not
|
||||
/// (is set to true when the first should_enforce_policy branch is hit
|
||||
enforcing: AtomicBool,
|
||||
|
||||
/// heap of values used for adjusting # of requests/second
|
||||
heap: std::sync::RwLock<LimitHeap>,
|
||||
}
|
||||
@@ -92,7 +85,7 @@ impl PolicyData {
|
||||
if let Ok(mut guard) = self.heap.write() {
|
||||
guard.original = reqs_sec as i32;
|
||||
guard.build();
|
||||
self.set_limit(guard.value() as usize);
|
||||
self.set_limit(reqs_sec);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -112,49 +105,85 @@ impl PolicyData {
|
||||
}
|
||||
|
||||
/// adjust the rate of requests per second up (increase rate)
|
||||
fn adjust_up(&self) {
|
||||
fn adjust_up(&self, url: &str, streak_counter: &usize) {
|
||||
// log::error!("enter: adjust up"); // todo remove
|
||||
if let Ok(mut heap) = self.heap.write() {
|
||||
if heap.has_children() {
|
||||
if let Ok(mut heap) = self.heap.try_write() {
|
||||
if *streak_counter > 2 {
|
||||
let current = heap.value();
|
||||
heap.move_up();
|
||||
heap.move_up();
|
||||
if current > heap.value() {
|
||||
// the tree's structure makes it so that sometimes 2 moves up results in a
|
||||
// value greater than the current node's and other times we need to move 3 up
|
||||
// to arrive at a greater value
|
||||
if heap.has_parent() {
|
||||
heap.move_up();
|
||||
} else {
|
||||
// been here enough that we can try resuming the scan to its original
|
||||
// speed (no limiting at all)
|
||||
heap.inner[0] = heap.original;
|
||||
}
|
||||
self.set_limit(heap.value() as usize);
|
||||
log::error!(
|
||||
"[STREAK UP ({} errors)] {} cur: {} new: {} orig: {}",
|
||||
atomic_load!(self.errors),
|
||||
url,
|
||||
current,
|
||||
heap.value(),
|
||||
heap.original,
|
||||
); // todo remove
|
||||
}
|
||||
} else if heap.has_children() {
|
||||
let old_limit = heap.value(); // todo remove
|
||||
heap.move_left();
|
||||
self.set_limit(heap.value() as usize);
|
||||
// log::error!(
|
||||
// "[UP ({})] current limit: {} new limit: {}",
|
||||
// atomic_load!(self.errors),
|
||||
// old_limit,
|
||||
// heap.value()
|
||||
// ); // todo remove
|
||||
log::error!(
|
||||
"[UP ({} errors)] {} cur: {} new: {} orig: {}",
|
||||
atomic_load!(self.errors),
|
||||
url,
|
||||
old_limit,
|
||||
heap.value(),
|
||||
heap.original
|
||||
); // todo remove
|
||||
} else {
|
||||
let old_limit = heap.value(); // todo remove
|
||||
let current = heap.value(); // todo remove
|
||||
heap.move_up();
|
||||
heap.move_up();
|
||||
if current > heap.value() {
|
||||
heap.move_up();
|
||||
}
|
||||
self.set_limit(heap.value() as usize);
|
||||
// log::error!(
|
||||
// "[UP ({})] current limit: {} new limit: {}",
|
||||
// atomic_load!(self.errors),
|
||||
// old_limit,
|
||||
// heap.value()
|
||||
// ); // todo remove
|
||||
log::error!(
|
||||
"[BOTTOMED-OUT UP ({} errors)] {} cur: {} new: {} orig: {}",
|
||||
atomic_load!(self.errors),
|
||||
url,
|
||||
current,
|
||||
heap.value(),
|
||||
heap.original
|
||||
); // todo remove
|
||||
}
|
||||
}
|
||||
// log::error!("exit: adjust up"); // todo remove
|
||||
}
|
||||
|
||||
/// adjust the rate of requests per second down (decrease rate)
|
||||
fn adjust_down(&self) {
|
||||
fn adjust_down(&self, target_url: &str, url: &str) {
|
||||
// log::error!("enter: adjust down: {:?}", self); // todo remove
|
||||
|
||||
if let Ok(mut heap) = self.heap.write() {
|
||||
if let Ok(mut heap) = self.heap.try_write() {
|
||||
if heap.has_children() {
|
||||
let old_limit = heap.value(); // todo remove
|
||||
heap.move_right();
|
||||
self.set_limit(heap.value() as usize);
|
||||
// log::error!(
|
||||
// "[DOWN ({})] current limit: {} new limit: {}",
|
||||
// atomic_load!(self.errors),
|
||||
// old_limit,
|
||||
// heap.value()
|
||||
// ); // todo remove
|
||||
log::error!(
|
||||
"[DOWN ({} errors)] {} ({}) cur: {} new: {} orig: {}",
|
||||
atomic_load!(self.errors),
|
||||
target_url,
|
||||
url,
|
||||
old_limit,
|
||||
heap.value(),
|
||||
heap.original
|
||||
); // todo remove
|
||||
}
|
||||
}
|
||||
// log::error!("exit: adjust down"); // todo remove
|
||||
@@ -333,12 +362,22 @@ pub(super) struct Requester {
|
||||
|
||||
/// data regarding policy and metadata about last enforced trigger etc...
|
||||
policy_data: PolicyData,
|
||||
|
||||
/// FeroxScan associated with the creation of this Requester
|
||||
ferox_scan: Arc<FeroxScan>,
|
||||
|
||||
/// simple lock to control access to tuning to a single thread (per-scan)
|
||||
///
|
||||
/// need a usize to determine the number of consecutive non-error calls that a requester has
|
||||
/// seen; this will satisfy the non-mut self constraint (due to us being behind an Arc, and
|
||||
/// the need for a counter
|
||||
tuning_lock: Mutex<usize>,
|
||||
}
|
||||
|
||||
/// Requester implementation
|
||||
impl Requester {
|
||||
/// given a FeroxScanner, create a Requester
|
||||
pub fn from(scanner: &FeroxScanner) -> Result<Self> {
|
||||
pub fn from(scanner: &FeroxScanner, ferox_scan: Arc<FeroxScan>) -> Result<Self> {
|
||||
let limit = scanner.handles.config.rate_limit;
|
||||
|
||||
let rate_limiter = if limit > 0 {
|
||||
@@ -350,10 +389,12 @@ impl Requester {
|
||||
let policy_data = PolicyData::new(scanner.handles.config.requester_policy);
|
||||
|
||||
Ok(Self {
|
||||
ferox_scan,
|
||||
policy_data,
|
||||
rate_limiter: RwLock::new(rate_limiter),
|
||||
handles: scanner.handles.clone(),
|
||||
target_url: scanner.target_url.to_owned(),
|
||||
tuning_lock: Mutex::new(0),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -373,27 +414,21 @@ impl Requester {
|
||||
|
||||
/// query the statistics handler in order to get the (current) number of requests/second
|
||||
async fn get_reqs_sec(&self) -> Result<f64> {
|
||||
let reqs = atomic_load!(self.handles.stats.data.requests) as f64;
|
||||
|
||||
let (tx, rx) = oneshot::channel::<f64>();
|
||||
self.handles.stats.send(GetRuntime(tx))?;
|
||||
let secs = rx.await?;
|
||||
|
||||
Ok(reqs / secs)
|
||||
Ok(self.ferox_scan.requests_per_second() as f64) // todo arguable that this needs to be a fn
|
||||
}
|
||||
|
||||
/// sleep and set a flag that can be checked by other threads
|
||||
async fn cool_down(&self, wait_time: u64) {
|
||||
if atomic_load!(self.policy_data.cooling_down) {
|
||||
if atomic_load!(self.policy_data.cooling_down, Ordering::SeqCst) {
|
||||
// prevents a few racy threads making it in here and doubling the wait time erroneously
|
||||
return;
|
||||
}
|
||||
|
||||
atomic_store!(self.policy_data.cooling_down, true);
|
||||
atomic_store!(self.policy_data.cooling_down, true, Ordering::SeqCst);
|
||||
|
||||
sleep(Duration::from_millis(wait_time)).await;
|
||||
|
||||
atomic_store!(self.policy_data.cooling_down, false);
|
||||
atomic_store!(self.policy_data.cooling_down, false, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
/// limit the number of requests per second
|
||||
@@ -408,18 +443,34 @@ impl Requester {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// simple wrapper to update the appropriate usize field on the Stats object
|
||||
fn update_error_field(&self, num_errors: usize, trigger: PolicyTrigger) {
|
||||
let field = match trigger {
|
||||
PolicyTrigger::Status403 => Enforced403s,
|
||||
PolicyTrigger::Status429 => Enforced429s,
|
||||
PolicyTrigger::Errors => EnforcedErrors,
|
||||
/// small function to break out different error checking mechanisms
|
||||
fn too_many_errors(&self) -> bool {
|
||||
let total = self.ferox_scan.num_errors(PolicyTrigger::Errors);
|
||||
|
||||
let (threads, tmax) = match self.policy_data.policy {
|
||||
// RequesterPolicy::AutoBail => (self.handles.config.threads / 2, 25),
|
||||
_ => (self.handles.config.threads / 2, 25),
|
||||
// _ => (self.handles.config.threads, 50), // todo if unused, remove lint / update comment below
|
||||
};
|
||||
|
||||
self.handles
|
||||
.stats
|
||||
.data
|
||||
.update_usize_field(field, num_errors);
|
||||
// at least 50 errors
|
||||
let threshold = max(threads, tmax);
|
||||
|
||||
total >= threshold
|
||||
}
|
||||
|
||||
/// small function to break out different error checking mechanisms
|
||||
fn too_many_status_errors(&self, trigger: PolicyTrigger) -> bool {
|
||||
let total = self.ferox_scan.num_errors(trigger);
|
||||
let requests = self.ferox_scan.requests();
|
||||
|
||||
let ratio = total as f64 / requests as f64;
|
||||
|
||||
match trigger {
|
||||
PolicyTrigger::Status403 => ratio >= HIGH_ERROR_RATIO,
|
||||
PolicyTrigger::Status429 => ratio >= HIGH_ERROR_RATIO / 3.0,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
/// determine whether or not a policy needs to be enforced
|
||||
@@ -429,7 +480,7 @@ impl Requester {
|
||||
/// - 90% of requests are 403
|
||||
/// - 30% of requests are 429
|
||||
fn should_enforce_policy(&self) -> Option<PolicyTrigger> {
|
||||
if atomic_load!(self.policy_data.cooling_down) {
|
||||
if atomic_load!(self.policy_data.cooling_down, Ordering::SeqCst) {
|
||||
// prevents a few racy threads making it in here and doubling the wait time erroneously
|
||||
return None;
|
||||
}
|
||||
@@ -442,40 +493,15 @@ impl Requester {
|
||||
return None;
|
||||
}
|
||||
|
||||
let total_errors = self.handles.stats.data.errors();
|
||||
let enforced_errors = self.handles.stats.data.enforced_errors();
|
||||
|
||||
let unenforced_errors = total_errors.saturating_sub(enforced_errors);
|
||||
|
||||
// at least 50 errors
|
||||
let threshold = max(self.handles.config.threads, 50);
|
||||
|
||||
if unenforced_errors >= threshold {
|
||||
self.update_error_field(unenforced_errors, PolicyTrigger::Errors);
|
||||
if self.too_many_errors() {
|
||||
return Some(PolicyTrigger::Errors);
|
||||
}
|
||||
|
||||
let total_403s = self.handles.stats.data.status_403s();
|
||||
let enforced_403s = self.handles.stats.data.enforced_403s();
|
||||
|
||||
let unenforced_403s = total_403s.saturating_sub(enforced_403s);
|
||||
|
||||
let ratio_403s = unenforced_403s as f64 / requests as f64;
|
||||
if ratio_403s >= HIGH_ERROR_RATIO {
|
||||
// almost exclusively 403
|
||||
self.update_error_field(unenforced_403s, PolicyTrigger::Status403);
|
||||
if self.too_many_status_errors(PolicyTrigger::Status403) {
|
||||
return Some(PolicyTrigger::Status403);
|
||||
}
|
||||
|
||||
let total_429s = self.handles.stats.data.status_429s();
|
||||
let enforced_429s = self.handles.stats.data.enforced_429s();
|
||||
|
||||
let unenforced_429s = total_429s.saturating_sub(enforced_429s);
|
||||
|
||||
let ratio_429s = unenforced_429s as f64 / requests as f64;
|
||||
if ratio_429s >= HIGH_ERROR_RATIO / 3.0 {
|
||||
// high # of 429 responses
|
||||
self.update_error_field(unenforced_429s, PolicyTrigger::Status429);
|
||||
if self.too_many_status_errors(PolicyTrigger::Status429) {
|
||||
return Some(PolicyTrigger::Status429);
|
||||
}
|
||||
|
||||
@@ -484,155 +510,167 @@ impl Requester {
|
||||
|
||||
/// query the statistics handler for the current number of errors based on the given policy
|
||||
async fn get_errors_by_policy(&self, trigger: PolicyTrigger) -> Result<usize> {
|
||||
match (self.policy_data.policy, trigger) {
|
||||
(RequesterPolicy::AutoBail, PolicyTrigger::Status403) => {
|
||||
Ok(self.handles.stats.data.status_403s())
|
||||
}
|
||||
(RequesterPolicy::AutoBail, PolicyTrigger::Status429) => {
|
||||
Ok(self.handles.stats.data.status_429s())
|
||||
}
|
||||
(RequesterPolicy::AutoBail, PolicyTrigger::Errors) => {
|
||||
Ok(self.handles.stats.data.errors())
|
||||
}
|
||||
(RequesterPolicy::AutoTune, _) => {
|
||||
// todo unwrap etc
|
||||
let errors = self
|
||||
.handles
|
||||
.ferox_scans()
|
||||
.unwrap()
|
||||
.get_scan_by_url(&self.target_url)
|
||||
.unwrap()
|
||||
.num_errors(trigger);
|
||||
Ok(errors)
|
||||
}
|
||||
// (RequesterPolicy::AutoTune, PolicyTrigger::Status429) => {
|
||||
// // todo unwrap etc
|
||||
// let errors = self.handles.ferox_scans().unwrap().get_scan_by_url(&self.target_url).unwrap().num_errors(trigger);
|
||||
// Ok(errors)
|
||||
// }
|
||||
// (RequesterPolicy::AutoTune, PolicyTrigger::Errors) => {
|
||||
// // todo unwrap etc
|
||||
// let errors = self.handles.ferox_scans().unwrap().get_scan_by_url(&self.target_url).unwrap().num_errors(trigger);
|
||||
// Ok(errors)
|
||||
// }
|
||||
// todo autotune error checking isn't quite right, it's checking overall errors then whether
|
||||
// or not its personal errors are > or ==, which often leads to over-adjusting down (probably)
|
||||
(RequesterPolicy::Default, _) => Ok(0),
|
||||
}
|
||||
Ok(self.ferox_scan.num_errors(trigger))
|
||||
}
|
||||
|
||||
/// todo doc
|
||||
async fn adjust_limit(&self, trigger: PolicyTrigger) -> Result<()> {
|
||||
async fn adjust_limit(&self, trigger: PolicyTrigger, url: &str) -> Result<()> {
|
||||
let errors = self.get_errors_by_policy(trigger).await?;
|
||||
// log::error!("[ADJUST ({})] {}", errors, self.target_url); // todo remove
|
||||
let policy_errors = atomic_load!(self.policy_data.errors, Ordering::SeqCst);
|
||||
|
||||
if errors > atomic_load!(self.policy_data.errors) {
|
||||
// errors have increased, need to reduce the requests/sec limit
|
||||
self.policy_data.adjust_down();
|
||||
self.policy_data.set_errors(errors);
|
||||
} else {
|
||||
// errors can only be incremented, so an else is sufficient
|
||||
self.policy_data.adjust_up();
|
||||
if let Ok(mut guard) = self.tuning_lock.try_lock() {
|
||||
if errors > policy_errors {
|
||||
// errors have increased, need to reduce the requests/sec limit
|
||||
*guard = 0; // reset streak counter to 0
|
||||
if atomic_load!(self.policy_data.errors) == 0 {
|
||||
log::error!(
|
||||
"[ADJUSTLIMIT FIRST] {} scan errs: {} pol errs: {} limit: {} limiter: {:?} orig: {}",
|
||||
self.target_url,
|
||||
errors,
|
||||
policy_errors,
|
||||
self.policy_data.get_limit(),
|
||||
self.rate_limiter,
|
||||
self.policy_data.heap.read().unwrap().original
|
||||
); // todo remove
|
||||
self.policy_data.set_errors(errors);
|
||||
} else {
|
||||
log::error!(
|
||||
"[ADJUSTLIMIT] {} scan errs: {} pol errs: {} limit: {} limiter: {:?} orig: {}",
|
||||
self.target_url,
|
||||
errors,
|
||||
policy_errors,
|
||||
self.policy_data.get_limit(),
|
||||
self.rate_limiter,
|
||||
self.policy_data.heap.read().unwrap().original
|
||||
); // todo remove
|
||||
|
||||
self.policy_data.set_errors(errors);
|
||||
self.policy_data.adjust_down(&self.target_url, url); // todo remove url
|
||||
}
|
||||
} else {
|
||||
// errors can only be incremented, so an else is sufficient
|
||||
*guard += 1;
|
||||
self.policy_data.adjust_up(&self.target_url, &*guard);
|
||||
}
|
||||
}
|
||||
|
||||
self.set_rate_limiter().await?;
|
||||
|
||||
log::error!(
|
||||
"[POST-CREATE] {} {:?}",
|
||||
self.target_url,
|
||||
self.rate_limiter.read().await
|
||||
);
|
||||
self.set_rate_limiter(0, false).await?; // todo remove params probably
|
||||
log::error!(
|
||||
"[POST-SET-NEW] {} {:?}",
|
||||
self.target_url,
|
||||
self.rate_limiter.read().await
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// lock the rate limiter and set its value to ta new leaky_bucket
|
||||
async fn set_rate_limiter(&self) -> Result<()> {
|
||||
let new_bucket = Self::build_a_bucket(self.policy_data.get_limit())?;
|
||||
async fn set_rate_limiter(&self, new_limit: usize, initial_tune: bool) -> Result<()> {
|
||||
let current = self.policy_data.get_limit(); // todo initial_tune not needed, just feed a value that isnt the current limit
|
||||
|
||||
if new_limit == current && !initial_tune {
|
||||
// this function is called more often than i'd prefer due to Send requirements of
|
||||
// mutex/rwlock primitives and awaits, this will minimize the cost of the extra calls
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let new_bucket = Self::build_a_bucket(current)?; // todo rename current if params removed
|
||||
let mut guard = self.rate_limiter.write().await;
|
||||
let _ = std::mem::replace(&mut *guard, Some(new_bucket));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// enforce auto-tune policy
|
||||
async fn tune(&self, trigger: PolicyTrigger) -> Result<()> {
|
||||
if self.rate_limiter.read().await.is_none() {
|
||||
async fn tune(&self, trigger: PolicyTrigger, url: &str) -> Result<()> {
|
||||
// todo 1: first-down and down times are extemely close together, still results in a
|
||||
// super quick reduction to a quarter of the scan speed instead of a half
|
||||
// todo 2: check for lock contention, feels like the tune checks aren't happening as often
|
||||
// as they should be
|
||||
// let enforcing = atomic_load!(self.policy_data.enforcing, Ordering::SeqCst);
|
||||
// if !enforcing {
|
||||
// atomic_store!(self.policy_data.enforcing, true, Ordering::SeqCst);
|
||||
// atomic_store!(self.policy_data.cooling_down, true, Ordering::SeqCst);
|
||||
// // set original number of reqs/second the first time tune is called, skip otherwise
|
||||
// let reqs_sec = self.get_reqs_sec().await? as usize;
|
||||
// self.policy_data.set_reqs_sec(reqs_sec.clone());
|
||||
//
|
||||
// self.set_rate_limiter(reqs_sec, true).await?;
|
||||
// let errors = self.get_errors_by_policy(trigger).await?;
|
||||
// self.policy_data.set_errors(errors);
|
||||
//
|
||||
// atomic_store!(self.policy_data.cooling_down, false, Ordering::SeqCst);
|
||||
// self.cool_down(WAIT_TIME).await;
|
||||
// log::error!(
|
||||
// "[FIRST DOWN ({} errors)] {} ({}) cur: {} new: {}",
|
||||
// errors,
|
||||
// self.target_url,
|
||||
// url,
|
||||
// self.policy_data.get_limit(),
|
||||
// reqs_sec
|
||||
// ); // todo remove
|
||||
// } else {
|
||||
// // todo try putting this in the same block and only guarding against these two calls?
|
||||
// let reqs_sec = self.get_reqs_sec().await? as usize;
|
||||
// self.policy_data.set_reqs_sec(reqs_sec.clone());
|
||||
//
|
||||
// self.adjust_limit(trigger, url).await?; // todo remove url
|
||||
//
|
||||
// self.cool_down(WAIT_TIME).await;
|
||||
//
|
||||
// // todo consider setting a 'tune' flag that prevents checking should_enforce, the thought
|
||||
// // being that once it's a yes, it's always a yes and tuning should be a time based thing
|
||||
// }
|
||||
if atomic_load!(self.policy_data.errors) == 0 {
|
||||
// set original number of reqs/second the first time tune is called, skip otherwise
|
||||
let reqs_sec = self.get_reqs_sec().await? as usize;
|
||||
self.policy_data.set_reqs_sec(reqs_sec);
|
||||
self.set_rate_limiter().await?;
|
||||
self.policy_data.set_reqs_sec(reqs_sec.clone());
|
||||
log::error!("[PRE-CREATE] {:?}", self.rate_limiter.read().await);
|
||||
self.set_rate_limiter(0, true).await?; // todo remove new_limit and initial from fn call (maybe)
|
||||
}
|
||||
|
||||
self.adjust_limit(trigger).await?;
|
||||
self.adjust_limit(trigger, url).await?;
|
||||
|
||||
self.cool_down(WAIT_TIME).await;
|
||||
|
||||
// todo consider setting a 'tune' flag that prevents checking should_enforce, the thought
|
||||
// being that once it's a yes, it's always a yes and tuning should be a time based thing
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// enforce auto-bail policy
|
||||
async fn bail(&self, trigger: PolicyTrigger) -> Result<()> {
|
||||
let scans = self.handles.ferox_scans()?;
|
||||
if self.ferox_scan.is_active() {
|
||||
log::warn!(
|
||||
"too many {:?} ({}) triggered {:?} Policy on {}",
|
||||
trigger,
|
||||
self.ferox_scan.num_errors(trigger),
|
||||
self.handles.config.requester_policy,
|
||||
self.ferox_scan
|
||||
);
|
||||
|
||||
let mut scan_tuples = vec![];
|
||||
// if allowed to be called within .abort, the inner .await makes it so other
|
||||
// in-flight requests don't see the Cancelled status, doing it here ensures a
|
||||
// minimum number of requests entering this block
|
||||
self.ferox_scan
|
||||
.set_status(ScanStatus::Cancelled)
|
||||
.unwrap_or_else(|e| log::warn!("Could not set scan status: {}", e));
|
||||
|
||||
if let Ok(guard) = scans.scans.read() {
|
||||
for (i, scan) in guard.iter().enumerate() {
|
||||
if scan.is_active() && scan.num_errors(trigger) > 0 {
|
||||
// only active scans that have at least 1 error
|
||||
scan_tuples.push((i, scan.num_errors(trigger)));
|
||||
}
|
||||
}
|
||||
}
|
||||
// kill the scan
|
||||
self.ferox_scan
|
||||
.abort()
|
||||
.await
|
||||
.unwrap_or_else(|e| log::warn!("Could not bail on scan: {}", e));
|
||||
|
||||
if scan_tuples.is_empty() {
|
||||
return Ok(());
|
||||
}
|
||||
// figure out how many requests are skipped as a result
|
||||
let pb = self.ferox_scan.progress_bar();
|
||||
let num_skipped = pb.length().saturating_sub(pb.position()) as usize;
|
||||
|
||||
// sort by number of errors
|
||||
scan_tuples.sort_unstable_by(|x, y| y.1.cmp(&x.1));
|
||||
|
||||
for (idx, _) in scan_tuples {
|
||||
let scan = if let Ok(guard) = scans.scans.read() {
|
||||
guard.index(idx).clone()
|
||||
} else {
|
||||
log::warn!("Could not acquire the FeroxScans.scans lock");
|
||||
continue;
|
||||
};
|
||||
|
||||
if scan.is_active() {
|
||||
log::debug!(
|
||||
"too many {:?} ({}) triggered {:?} Policy on {}",
|
||||
trigger,
|
||||
scan.num_errors(trigger),
|
||||
self.handles.config.requester_policy,
|
||||
scan
|
||||
);
|
||||
|
||||
// if allowed to be called within .abort, the inner .await makes it so other
|
||||
// in-flight requests don't see the Cancelled status, doing it here ensures a
|
||||
// minimum number of requests entering this block
|
||||
scan.set_status(ScanStatus::Cancelled)
|
||||
.unwrap_or_else(|e| log::warn!("Could not set scan status: {}", e));
|
||||
|
||||
// set cooldown flag before awaiting the abort to reduce chance of races
|
||||
self.cool_down(1500).await;
|
||||
|
||||
// kill the scan
|
||||
scan.abort()
|
||||
.await
|
||||
.unwrap_or_else(|e| log::warn!("Could not bail on scan: {}", e));
|
||||
|
||||
// figure out how many requests are skipped as a result
|
||||
let pb = scan.progress_bar();
|
||||
let num_skipped = pb.length().saturating_sub(pb.position()) as usize;
|
||||
|
||||
// update the overall scan bar by subtracting the number of skipped requests from
|
||||
// the total
|
||||
self.handles
|
||||
.stats
|
||||
.send(SubtractFromUsizeField(TotalExpected, num_skipped))
|
||||
.unwrap_or_else(|e| log::warn!("Could not update overall scan bar: {}", e));
|
||||
|
||||
break;
|
||||
}
|
||||
// update the overall scan bar by subtracting the number of skipped requests from
|
||||
// the total
|
||||
self.handles
|
||||
.stats
|
||||
.send(SubtractFromUsizeField(TotalExpected, num_skipped))
|
||||
.unwrap_or_else(|e| log::warn!("Could not update overall scan bar: {}", e));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
@@ -648,7 +686,11 @@ impl Requester {
|
||||
FeroxUrl::from_string(&self.target_url, self.handles.clone()).formatted_urls(word)?;
|
||||
|
||||
for url in urls {
|
||||
let should_limit = self.rate_limiter.read().await.is_some();
|
||||
// auto_tune is true, or rate_limit was set (mutually exclusive to user)
|
||||
// and a rate_limiter has been created
|
||||
// short-circuiting the lock access behind the first boolean check
|
||||
let should_tune = self.handles.config.auto_tune || self.handles.config.rate_limit > 0;
|
||||
let should_limit = should_tune && self.rate_limiter.read().await.is_some();
|
||||
|
||||
if should_limit {
|
||||
// found a rate limiter, limit that junk!
|
||||
@@ -660,20 +702,27 @@ impl Requester {
|
||||
|
||||
let response = logged_request(&url, self.handles.clone()).await?;
|
||||
|
||||
if !atomic_load!(self.policy_data.cooling_down) {
|
||||
// only check for policy enforcement when the trigger isn't on cooldown
|
||||
if (should_tune || self.handles.config.auto_bail)
|
||||
&& !atomic_load!(self.policy_data.cooling_down, Ordering::SeqCst)
|
||||
{
|
||||
// only check for policy enforcement when the trigger isn't on cooldown and tuning
|
||||
// or bailing is in place (should_tune used here because when auto-tune is on, we'll
|
||||
// reach this without a rate_limiter in place)
|
||||
match self.policy_data.policy {
|
||||
RequesterPolicy::AutoTune => {
|
||||
// todo check for tune flag and short-circuit the enforce call
|
||||
if atomic_load!(SHOULD_TUNE) {
|
||||
let trigger = *TUNE_TRIGGER.lock().unwrap();
|
||||
self.tune(trigger).await?; // todo may or may not be right to bubble up
|
||||
} else if let Some(trigger) = self.should_enforce_policy() {
|
||||
if let Ok(mut guard) = TUNE_TRIGGER.lock() {
|
||||
*guard = trigger;
|
||||
}
|
||||
atomic_store!(SHOULD_TUNE, true);
|
||||
self.tune(trigger).await?; // todo may or may not be right to bubble up
|
||||
// if atomic_load!(SHOULD_TUNE) {
|
||||
// let trigger = *TUNE_TRIGGER.lock().unwrap();
|
||||
// self.tune(trigger).await?; // todo may or may not be right to bubble up
|
||||
// } else if let Some(trigger) = self.should_enforce_policy() {
|
||||
// if let Ok(mut guard) = TUNE_TRIGGER.lock() {
|
||||
// *guard = trigger;
|
||||
// }
|
||||
// atomic_store!(SHOULD_TUNE, true);
|
||||
// self.tune(trigger).await?; // todo may or may not be right to bubble up
|
||||
// }
|
||||
if let Some(trigger) = self.should_enforce_policy() {
|
||||
self.tune(trigger, &url.to_string()).await?; // todo bubble up? / remove url
|
||||
}
|
||||
}
|
||||
RequesterPolicy::AutoBail => {
|
||||
@@ -684,7 +733,7 @@ impl Requester {
|
||||
RequesterPolicy::Default => {}
|
||||
}
|
||||
}
|
||||
// todo requests/second on scan bar aren't showing
|
||||
|
||||
// response came back without error, convert it to FeroxResponse
|
||||
let ferox_response =
|
||||
FeroxResponse::from(response, true, self.handles.config.output_level).await;
|
||||
|
||||
@@ -20,6 +20,9 @@ macro_rules! atomic_load {
|
||||
($metric:expr) => {
|
||||
$metric.load(Ordering::Relaxed);
|
||||
};
|
||||
($metric:expr, $ordering:expr) => {
|
||||
$metric.load($ordering);
|
||||
};
|
||||
}
|
||||
|
||||
/// Wrapper around `Atomic*.store` to save me from writing Ordering::Relaxed a bajillion times
|
||||
@@ -28,4 +31,7 @@ macro_rules! atomic_store {
|
||||
($metric:expr, $value:expr) => {
|
||||
$metric.store($value, Ordering::Relaxed);
|
||||
};
|
||||
($metric:expr, $value:expr, $ordering:expr) => {
|
||||
$metric.store($value, $ordering);
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user