removed todos related to tuning

This commit is contained in:
epi
2021-02-17 12:23:49 -06:00
parent a09493b845
commit d78dbb76b1
6 changed files with 100 additions and 189 deletions

View File

@@ -11,7 +11,7 @@ Long form explanations of most of the items below can be found in the [CONTRIBUT
## Static analysis checks
- [ ] All rust files are formatted using `cargo fmt`
- [ ] All `clippy` checks pass when running `cargo clippy --all-targets --all-features -- -D warnings -A clippy::deref_addrof`
- [ ] All `clippy` checks pass when running `cargo clippy --all-targets --all-features -- -D warnings -A clippy::deref_addrof -A clippy::mutex-atomic`
- [ ] All existing tests pass
## Documentation

View File

@@ -61,4 +61,4 @@ jobs:
- uses: actions-rs/cargo@v1
with:
command: clippy
args: --all-targets --all-features -- -D warnings -A clippy::deref_addrof
args: --all-targets --all-features -- -D warnings -A clippy::deref_addrof -A clippy::mutex-atomic

View File

@@ -63,7 +63,7 @@ pub struct FeroxScan {
pub(super) errors: AtomicUsize,
/// tracker for the time at which this scan was started
pub(super) start_time: Instant
pub(super) start_time: Instant,
}
/// Default implementation for FeroxScan
@@ -144,15 +144,9 @@ impl FeroxScan {
/// Simple helper get a progress bar
pub fn progress_bar(&self) -> ProgressBar {
// log::error!("enter progressbar: {:?} ", self); // todo remove
match self.progress_bar.lock() {
Ok(mut guard) => {
// log::error!("{} - got lock", self.url); // todo remove
if guard.is_some() {
// log::error!("{} - returned {:?} clone", self.url, self.output_level); // todo remove
(*guard).as_ref().unwrap().clone()
} else {
let bar_type = match self.output_level {
@@ -170,7 +164,7 @@ impl FeroxScan {
}
}
Err(_) => {
log::error!("Could not unlock progress bar on {:?}", self); // todo change to warn
log::warn!("Could not unlock progress bar on {:?}", self);
let bar_type = match self.output_level {
OutputLevel::Default => BarType::Default,

View File

@@ -12,6 +12,7 @@ use indicatif::ProgressBar;
use predicates::prelude::*;
use std::sync::{atomic::Ordering, Arc};
use std::thread::sleep;
use std::time::Instant;
use tokio::time::{self, Duration};
#[test]
@@ -437,6 +438,7 @@ fn feroxscan_display() {
scan_order: ScanOrder::Latest,
scan_type: Default::default(),
num_requests: 0,
start_time: Instant::now(),
output_level: OutputLevel::Default,
status_403s: Default::default(),
status_429s: Default::default(),
@@ -480,6 +482,7 @@ async fn ferox_scan_abort() {
scan_order: ScanOrder::Latest,
scan_type: Default::default(),
num_requests: 0,
start_time: Instant::now(),
output_level: OutputLevel::Default,
status_403s: Default::default(),
status_429s: Default::default(),

View File

@@ -146,7 +146,10 @@ impl FeroxScanner {
});
}
}
requester_clone.request(&word).await
requester_clone
.request(&word)
.await
.unwrap_or_else(|e| log::warn!("Requester encountered an error: {}", e))
}),
pb,
)

View File

@@ -17,7 +17,6 @@ use crate::{
use anyhow::Result;
use crate::scan_manager::FeroxScan;
use crate::utils::ferox_print;
use leaky_bucket::LeakyBucket;
use std::{
cmp::max,
@@ -31,9 +30,6 @@ use tokio::{
time::{sleep, Duration},
};
/// default number of milliseconds to wait during a cooldown period
const WAIT_TIME: u64 = 3500;
#[derive(Copy, Clone, PartialEq, Debug)]
/// represents different situations where different criteria can trigger auto-tune/bail behavior
pub enum PolicyTrigger {
@@ -56,15 +52,18 @@ pub struct PolicyData {
/// whether or not we're in the middle of a cooldown period
cooling_down: AtomicBool,
/// length of time to pause tuning after making an adjustment
wait_time: u64,
/// rate limit (at last interval)
limit: AtomicUsize,
/// number of errors (at last interval)
errors: AtomicUsize,
/// whether or not the owning Requester has started enforcing the policy yet or not
/// (is set to true when the first should_enforce_policy branch is hit
enforcing: AtomicBool,
/// whether or not the owning Requester should remove the rate_limiter, happens when a scan
/// has been limited and moves back up to the point of its original scan speed
remove_limit: AtomicBool,
/// heap of values used for adjusting # of requests/second
heap: std::sync::RwLock<LimitHeap>,
@@ -73,9 +72,13 @@ pub struct PolicyData {
/// implementation of PolicyData
impl PolicyData {
/// given a RequesterPolicy, create a new PolicyData
fn new(policy: RequesterPolicy) -> Self {
fn new(policy: RequesterPolicy, timeout: u64) -> Self {
// can use this as a tweak for how aggressively adjustments should be made when tuning
let wait_time = ((timeout as f64 / 2.0) * 1000.0) as u64;
Self {
policy,
wait_time,
..Default::default()
}
}
@@ -85,7 +88,7 @@ impl PolicyData {
if let Ok(mut guard) = self.heap.write() {
guard.original = reqs_sec as i32;
guard.build();
self.set_limit(reqs_sec);
self.set_limit(guard.inner[0] as usize); // set limit to 1/2 of current request rate
}
}
@@ -105,10 +108,11 @@ impl PolicyData {
}
/// adjust the rate of requests per second up (increase rate)
fn adjust_up(&self, url: &str, streak_counter: &usize) {
// log::error!("enter: adjust up"); // todo remove
fn adjust_up(&self, streak_counter: &usize) {
if let Ok(mut heap) = self.heap.try_write() {
if *streak_counter > 2 {
// streak of 3 upward moves in a row, traverse the tree upward instead of to a
// higher-valued branch lower in the tree
let current = heap.value();
heap.move_up();
heap.move_up();
@@ -117,76 +121,43 @@ impl PolicyData {
// value greater than the current node's and other times we need to move 3 up
// to arrive at a greater value
if heap.has_parent() {
// all nodes except 0th node (root)
heap.move_up();
} else {
// been here enough that we can try resuming the scan to its original
// speed (no limiting at all)
heap.inner[0] = heap.original;
atomic_store!(self.remove_limit, true);
}
self.set_limit(heap.value() as usize);
log::error!(
"[STREAK UP ({} errors)] {} cur: {} new: {} orig: {}",
atomic_load!(self.errors),
url,
current,
heap.value(),
heap.original,
); // todo remove
}
} else if heap.has_children() {
let old_limit = heap.value(); // todo remove
// streak not at 3, just check that we can move down, and do so
heap.move_left();
self.set_limit(heap.value() as usize);
log::error!(
"[UP ({} errors)] {} cur: {} new: {} orig: {}",
atomic_load!(self.errors),
url,
old_limit,
heap.value(),
heap.original
); // todo remove
} else {
let current = heap.value(); // todo remove
// tree bottomed out, need to move back up the tree a bit
let current = heap.value();
heap.move_up();
heap.move_up();
if current > heap.value() {
heap.move_up();
}
self.set_limit(heap.value() as usize);
log::error!(
"[BOTTOMED-OUT UP ({} errors)] {} cur: {} new: {} orig: {}",
atomic_load!(self.errors),
url,
current,
heap.value(),
heap.original
); // todo remove
}
}
// log::error!("exit: adjust up"); // todo remove
}
/// adjust the rate of requests per second down (decrease rate)
fn adjust_down(&self, target_url: &str, url: &str) {
// log::error!("enter: adjust down: {:?}", self); // todo remove
fn adjust_down(&self) {
if let Ok(mut heap) = self.heap.try_write() {
if heap.has_children() {
let old_limit = heap.value(); // todo remove
heap.move_right();
self.set_limit(heap.value() as usize);
log::error!(
"[DOWN ({} errors)] {} ({}) cur: {} new: {} orig: {}",
atomic_load!(self.errors),
target_url,
url,
old_limit,
heap.value(),
heap.original
); // todo remove
}
}
// log::error!("exit: adjust down"); // todo remove
}
}
@@ -386,7 +357,10 @@ impl Requester {
None
};
let policy_data = PolicyData::new(scanner.handles.config.requester_policy);
let policy_data = PolicyData::new(
scanner.handles.config.requester_policy,
scanner.handles.config.timeout,
);
Ok(Self {
ferox_scan,
@@ -400,8 +374,8 @@ impl Requester {
/// build a LeakyBucket, given a rate limit (as requests per second)
fn build_a_bucket(limit: usize) -> Result<LeakyBucket> {
let refill = max(limit / 10, 1); // minimum of 1 per second
let tokens = max(limit / 2, 1);
let refill = max((limit as f64 / 10.0).round() as usize, 1); // minimum of 1 per second
let tokens = max((limit as f64 / 2.0).round() as usize, 1);
let interval = if refill == 1 { 1000 } else { 100 }; // 1 second if refill is 1
Ok(LeakyBucket::builder()
@@ -412,13 +386,8 @@ impl Requester {
.build()?)
}
/// query the statistics handler in order to get the (current) number of requests/second
async fn get_reqs_sec(&self) -> Result<f64> {
Ok(self.ferox_scan.requests_per_second() as f64) // todo arguable that this needs to be a fn
}
/// sleep and set a flag that can be checked by other threads
async fn cool_down(&self, wait_time: u64) {
async fn cool_down(&self) {
if atomic_load!(self.policy_data.cooling_down, Ordering::SeqCst) {
// prevents a few racy threads making it in here and doubling the wait time erroneously
return;
@@ -426,7 +395,7 @@ impl Requester {
atomic_store!(self.policy_data.cooling_down, true, Ordering::SeqCst);
sleep(Duration::from_millis(wait_time)).await;
sleep(Duration::from_millis(self.policy_data.wait_time)).await;
atomic_store!(self.policy_data.cooling_down, false, Ordering::SeqCst);
}
@@ -447,14 +416,8 @@ impl Requester {
fn too_many_errors(&self) -> bool {
let total = self.ferox_scan.num_errors(PolicyTrigger::Errors);
let (threads, tmax) = match self.policy_data.policy {
// RequesterPolicy::AutoBail => (self.handles.config.threads / 2, 25),
_ => (self.handles.config.threads / 2, 25),
// _ => (self.handles.config.threads, 50), // todo if unused, remove lint / update comment below
};
// at least 50 errors
let threshold = max(threads, tmax);
// at least 25 errors
let threshold = max(self.handles.config.threads / 2, 25);
total >= threshold
}
@@ -509,130 +472,75 @@ impl Requester {
}
/// query the statistics handler for the current number of errors based on the given policy
async fn get_errors_by_policy(&self, trigger: PolicyTrigger) -> Result<usize> {
async fn get_scan_errors_by_policy(&self, trigger: PolicyTrigger) -> Result<usize> {
Ok(self.ferox_scan.num_errors(trigger))
}
/// todo doc
async fn adjust_limit(&self, trigger: PolicyTrigger, url: &str) -> Result<()> {
let errors = self.get_errors_by_policy(trigger).await?;
/// wrapper for adjust_[up,down] functions, checks error levels to determine adjustment direction
async fn adjust_limit(&self, trigger: PolicyTrigger) -> Result<()> {
let scan_errors = self.get_scan_errors_by_policy(trigger).await?;
let policy_errors = atomic_load!(self.policy_data.errors, Ordering::SeqCst);
if let Ok(mut guard) = self.tuning_lock.try_lock() {
if errors > policy_errors {
if scan_errors > policy_errors {
// errors have increased, need to reduce the requests/sec limit
*guard = 0; // reset streak counter to 0
if atomic_load!(self.policy_data.errors) == 0 {
log::error!(
"[ADJUSTLIMIT FIRST] {} scan errs: {} pol errs: {} limit: {} limiter: {:?} orig: {}",
self.target_url,
errors,
policy_errors,
self.policy_data.get_limit(),
self.rate_limiter,
self.policy_data.heap.read().unwrap().original
); // todo remove
self.policy_data.set_errors(errors);
} else {
log::error!(
"[ADJUSTLIMIT] {} scan errs: {} pol errs: {} limit: {} limiter: {:?} orig: {}",
self.target_url,
errors,
policy_errors,
self.policy_data.get_limit(),
self.rate_limiter,
self.policy_data.heap.read().unwrap().original
); // todo remove
self.policy_data.set_errors(errors);
self.policy_data.adjust_down(&self.target_url, url); // todo remove url
if atomic_load!(self.policy_data.errors) != 0 {
self.policy_data.adjust_down();
}
self.policy_data.set_errors(scan_errors);
} else {
// errors can only be incremented, so an else is sufficient
*guard += 1;
self.policy_data.adjust_up(&self.target_url, &*guard);
self.policy_data.adjust_up(&*guard);
}
}
log::error!(
"[POST-CREATE] {} {:?}",
self.target_url,
self.rate_limiter.read().await
);
self.set_rate_limiter(0, false).await?; // todo remove params probably
log::error!(
"[POST-SET-NEW] {} {:?}",
self.target_url,
self.rate_limiter.read().await
);
if atomic_load!(self.policy_data.remove_limit) {
self.set_rate_limiter(None).await?;
atomic_store!(self.policy_data.remove_limit, false);
} else {
let new_limit = self.policy_data.get_limit(); // limit is set from within the lock
self.set_rate_limiter(Some(new_limit)).await?;
}
Ok(())
}
/// lock the rate limiter and set its value to ta new leaky_bucket
async fn set_rate_limiter(&self, new_limit: usize, initial_tune: bool) -> Result<()> {
let current = self.policy_data.get_limit(); // todo initial_tune not needed, just feed a value that isnt the current limit
async fn set_rate_limiter(&self, new_limit: Option<usize>) -> Result<()> {
let mut guard = self.rate_limiter.write().await;
let new_bucket = if new_limit.is_none() {
// got None, need to remove the rate_limiter
None
} else if guard.is_some() && guard.as_ref().unwrap().max() == new_limit.unwrap() {
// new_limit is checked for None in first branch, should be fine to unwrap
if new_limit == current && !initial_tune {
// this function is called more often than i'd prefer due to Send requirements of
// mutex/rwlock primitives and awaits, this will minimize the cost of the extra calls
return Ok(());
}
} else {
Some(Self::build_a_bucket(new_limit.unwrap())?)
};
let new_bucket = Self::build_a_bucket(current)?; // todo rename current if params removed
let mut guard = self.rate_limiter.write().await;
let _ = std::mem::replace(&mut *guard, Some(new_bucket));
let _ = std::mem::replace(&mut *guard, new_bucket);
Ok(())
}
/// enforce auto-tune policy
async fn tune(&self, trigger: PolicyTrigger, url: &str) -> Result<()> {
// todo 1: first-down and down times are extemely close together, still results in a
// super quick reduction to a quarter of the scan speed instead of a half
// todo 2: check for lock contention, feels like the tune checks aren't happening as often
// as they should be
// let enforcing = atomic_load!(self.policy_data.enforcing, Ordering::SeqCst);
// if !enforcing {
// atomic_store!(self.policy_data.enforcing, true, Ordering::SeqCst);
// atomic_store!(self.policy_data.cooling_down, true, Ordering::SeqCst);
// // set original number of reqs/second the first time tune is called, skip otherwise
// let reqs_sec = self.get_reqs_sec().await? as usize;
// self.policy_data.set_reqs_sec(reqs_sec.clone());
//
// self.set_rate_limiter(reqs_sec, true).await?;
// let errors = self.get_errors_by_policy(trigger).await?;
// self.policy_data.set_errors(errors);
//
// atomic_store!(self.policy_data.cooling_down, false, Ordering::SeqCst);
// self.cool_down(WAIT_TIME).await;
// log::error!(
// "[FIRST DOWN ({} errors)] {} ({}) cur: {} new: {}",
// errors,
// self.target_url,
// url,
// self.policy_data.get_limit(),
// reqs_sec
// ); // todo remove
// } else {
// // todo try putting this in the same block and only guarding against these two calls?
// let reqs_sec = self.get_reqs_sec().await? as usize;
// self.policy_data.set_reqs_sec(reqs_sec.clone());
//
// self.adjust_limit(trigger, url).await?; // todo remove url
//
// self.cool_down(WAIT_TIME).await;
//
// // todo consider setting a 'tune' flag that prevents checking should_enforce, the thought
// // being that once it's a yes, it's always a yes and tuning should be a time based thing
// }
async fn tune(&self, trigger: PolicyTrigger) -> Result<()> {
if atomic_load!(self.policy_data.errors) == 0 {
// set original number of reqs/second the first time tune is called, skip otherwise
let reqs_sec = self.get_reqs_sec().await? as usize;
self.policy_data.set_reqs_sec(reqs_sec.clone());
log::error!("[PRE-CREATE] {:?}", self.rate_limiter.read().await);
self.set_rate_limiter(0, true).await?; // todo remove new_limit and initial from fn call (maybe)
}
self.adjust_limit(trigger, url).await?;
let reqs_sec = self.ferox_scan.requests_per_second() as usize;
self.policy_data.set_reqs_sec(reqs_sec);
self.cool_down(WAIT_TIME).await;
let new_limit = self.policy_data.get_limit();
self.set_rate_limiter(Some(new_limit)).await?;
}
self.adjust_limit(trigger).await?;
self.cool_down().await;
Ok(())
}
@@ -710,24 +618,13 @@ impl Requester {
// reach this without a rate_limiter in place)
match self.policy_data.policy {
RequesterPolicy::AutoTune => {
// todo check for tune flag and short-circuit the enforce call
// if atomic_load!(SHOULD_TUNE) {
// let trigger = *TUNE_TRIGGER.lock().unwrap();
// self.tune(trigger).await?; // todo may or may not be right to bubble up
// } else if let Some(trigger) = self.should_enforce_policy() {
// if let Ok(mut guard) = TUNE_TRIGGER.lock() {
// *guard = trigger;
// }
// atomic_store!(SHOULD_TUNE, true);
// self.tune(trigger).await?; // todo may or may not be right to bubble up
// }
if let Some(trigger) = self.should_enforce_policy() {
self.tune(trigger, &url.to_string()).await?; // todo bubble up? / remove url
self.tune(trigger).await?;
}
}
RequesterPolicy::AutoBail => {
if let Some(trigger) = self.should_enforce_policy() {
self.bail(trigger).await?; // todo may or may not be right to bubble up
self.bail(trigger).await?;
}
}
RequesterPolicy::Default => {}
@@ -931,6 +828,8 @@ mod tests {
let requester = Requester {
handles,
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(FeroxScan::default()),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
@@ -952,6 +851,8 @@ mod tests {
let requester = Requester {
handles,
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(FeroxScan::default()),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
@@ -973,6 +874,8 @@ mod tests {
let requester = Requester {
handles,
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(FeroxScan::default()),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
@@ -997,6 +900,8 @@ mod tests {
let requester = Requester {
handles,
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(FeroxScan::default()),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
@@ -1036,6 +941,8 @@ mod tests {
let requester = Requester {
handles,
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(FeroxScan::default()),
target_url: "http://one/one/stuff.php".to_string(),
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
@@ -1067,6 +974,8 @@ mod tests {
let requester = Requester {
handles,
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(FeroxScan::default()),
target_url: "http://one/one/stuff.php".to_string(),
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
@@ -1086,6 +995,8 @@ mod tests {
let requester = Requester {
handles,
tuning_lock: Mutex::new(0),
ferox_scan: Arc::new(FeroxScan::default()),
target_url: "http://localhost".to_string(),
rate_limiter: RwLock::new(None),
policy_data: Default::default(),