incremental save before branch swap

This commit is contained in:
epi
2021-02-15 07:37:50 -06:00
parent 4019c31f9d
commit c4f072e159
8 changed files with 442 additions and 74 deletions

2
.gitignore vendored
View File

@@ -24,3 +24,5 @@ lcov_cobertura.py
# state file created during tests
ferox-http*
Pipfile*

View File

@@ -58,7 +58,7 @@ _feroxbuster() {
'*--filter-similar-to=[Filter out pages that are similar to the given page (ex. --filter-similar-to http://site.xyz/soft404)]' \
'-L+[Limit total number of concurrent scans (default: 0, i.e. no limit)]' \
'--scan-limit=[Limit total number of concurrent scans (default: 0, i.e. no limit)]' \
'--rate-limit=[Limit number of requests per second (per directory) (default: 0, i.e. no limit)]' \
'(--auto-tune)--rate-limit=[Limit number of requests per second (per directory) (default: 0, i.e. no limit)]' \
'--time-limit=[Limit total run time of all scans (ex: --time-limit 10m)]' \
'(--silent)*-v[Increase verbosity level (use -vv or more for greater effect. \[CAUTION\] 4 -v'\''s is probably too much)]' \
'(--silent)*--verbosity[Increase verbosity level (use -vv or more for greater effect. \[CAUTION\] 4 -v'\''s is probably too much)]' \

View File

@@ -61,6 +61,9 @@ pub enum Command {
/// Command used to test that a spawned task succeeded in initialization
Ping,
/// Query the statistics handler about the total elapsed scan time
GetRuntime(Sender<f64>),
/// Just receive a sender and reply, used for slowing down the main thread
Sync(Sender<bool>),

View File

@@ -124,6 +124,11 @@ impl StatsHandler {
Command::Sync(sender) => {
sender.send(true).unwrap_or_default();
}
Command::GetRuntime(sender) => {
sender
.send(start.elapsed().as_secs_f64())
.unwrap_or_default();
}
Command::Exit => break,
_ => {} // no more commands needed
}

View File

@@ -160,7 +160,6 @@ impl FeroxScan {
pb.reset_elapsed();
let _ = std::mem::replace(&mut *guard, Some(pb.clone()));
log::error!("{} - creating new {:?} bar", self.url, self.output_level); // todo remove
pb
}

View File

@@ -422,7 +422,7 @@ impl FeroxScans {
let bar = match scan_type {
ScanType::Directory => {
let bar_type = match self.output_level {
OutputLevel::Default => BarType::Message,
OutputLevel::Default => BarType::Default,
OutputLevel::Quiet => BarType::Quiet,
OutputLevel::Silent => BarType::Hidden,
};

View File

@@ -1,13 +1,14 @@
use super::FeroxScanner;
use crate::scan_manager::ScanStatus;
use crate::{
atomic_load, atomic_store,
config::RequesterPolicy,
event_handlers::{
Command::{self, AddError, SubtractFromUsizeField},
Command::{self, AddError, GetRuntime, SubtractFromUsizeField},
Handles,
},
extractor::{ExtractionTarget::ResponseBody, ExtractorBuilder},
response::FeroxResponse,
scan_manager::ScanStatus,
statistics::{
StatError::Other,
StatField::{Enforced403s, Enforced429s, EnforcedErrors, TotalExpected},
@@ -17,17 +18,32 @@ use crate::{
HIGH_ERROR_RATIO,
};
use anyhow::Result;
use lazy_static::lazy_static;
use leaky_bucket::LeakyBucket;
use std::ops::Index;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::{cmp::max, sync::Arc};
use std::{
cmp::max,
ops::Index,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
};
use tokio::{
sync::oneshot,
sync::{oneshot, RwLock},
time::{sleep, Duration},
};
/// default number of seconds to wait during a cooldown period
const WAIT_TIME: u64 = 5;
/// default number of milliseconds to wait during a cooldown period
const WAIT_TIME: u64 = 1250;
lazy_static! {
/// todo doc
static ref SHOULD_TUNE: AtomicBool = AtomicBool::new(false);
/// todo doc
static ref TUNE_TRIGGER: std::sync::Mutex<PolicyTrigger> = std::sync::Mutex::new(PolicyTrigger::Errors);
}
#[derive(Copy, Clone, PartialEq, Debug)]
/// represents different situations where different criteria can trigger auto-tune/bail behavior
@@ -43,16 +59,22 @@ pub enum PolicyTrigger {
}
/// data regarding policy and metadata about last enforced trigger etc...
#[derive(Default)]
#[derive(Default, Debug)]
pub struct PolicyData {
/// how to handle exceptional cases such as too many errors / 403s / 429s etc
policy: RequesterPolicy,
/// number of seconds to wait between checks for policy enforcement
wait_time: AtomicU64,
/// whether or not we're in the middle of a cooldown period
cooling_down: AtomicBool,
/// rate limit (at last interval)
limit: AtomicUsize,
/// number of errors (at last interval)
errors: AtomicUsize,
/// heap of values used for adjusting # of requests/second
heap: std::sync::RwLock<LimitHeap>,
}
/// implementation of PolicyData
@@ -61,33 +83,240 @@ impl PolicyData {
fn new(policy: RequesterPolicy) -> Self {
Self {
policy,
wait_time: AtomicU64::new(WAIT_TIME),
cooling_down: AtomicBool::new(false),
..Default::default()
}
}
/// todo doc
async fn backoff(&self, wait: Option<u64>) {
if self.cooling_down.load(Ordering::Relaxed) {
// prevents a few racy threads making it in here and doubling the wait time erroneously
return;
/// setter for requests / second; populates the underlying heap with values from req/sec seed
fn set_reqs_sec(&self, reqs_sec: usize) {
if let Ok(mut guard) = self.heap.write() {
guard.original = reqs_sec as i32;
guard.build();
self.set_limit(guard.value() as usize);
}
}
let current = if let Some(wt) = wait {
// called with optional wait param, only sleep for this length of time
wt
} else {
// exponential backoff, doubles with each policy trigger
// todo update comment above, i think i want to start at half and go back up to the
// original
self.wait_time
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |wt| Some(wt * 2))
.unwrap_or(WAIT_TIME)
};
/// setter for errors
fn set_errors(&self, errors: usize) {
atomic_store!(self.errors, errors);
}
self.cooling_down.store(true, Ordering::Relaxed);
sleep(Duration::new(current, 0)).await;
self.cooling_down.store(false, Ordering::Relaxed);
/// setter for limit
fn set_limit(&self, limit: usize) {
atomic_store!(self.limit, limit);
}
/// getter for limit
fn get_limit(&self) -> usize {
atomic_load!(self.limit)
}
/// adjust the rate of requests per second up (increase rate)
fn adjust_up(&self) {
// log::error!("enter: adjust up"); // todo remove
if let Ok(mut heap) = self.heap.write() {
if heap.has_children() {
let old_limit = heap.value(); // todo remove
heap.move_left();
self.set_limit(heap.value() as usize);
// log::error!(
// "[UP ({})] current limit: {} new limit: {}",
// atomic_load!(self.errors),
// old_limit,
// heap.value()
// ); // todo remove
} else {
let old_limit = heap.value(); // todo remove
heap.move_up();
self.set_limit(heap.value() as usize);
// log::error!(
// "[UP ({})] current limit: {} new limit: {}",
// atomic_load!(self.errors),
// old_limit,
// heap.value()
// ); // todo remove
}
}
// log::error!("exit: adjust up"); // todo remove
}
/// adjust the rate of requests per second down (decrease rate)
fn adjust_down(&self) {
// log::error!("enter: adjust down: {:?}", self); // todo remove
if let Ok(mut heap) = self.heap.write() {
if heap.has_children() {
let old_limit = heap.value(); // todo remove
heap.move_right();
self.set_limit(heap.value() as usize);
// log::error!(
// "[DOWN ({})] current limit: {} new limit: {}",
// atomic_load!(self.errors),
// old_limit,
// heap.value()
// ); // todo remove
}
}
// log::error!("exit: adjust down"); // todo remove
}
}
/// bespoke variation on an array-backed max-heap
///
/// 255 possible values generated from the initial requests/second
///
/// when no additional errors are encountered, the left child is taken (increasing req/sec)
/// if errors have increased since the last interval, the right child is taken (decreasing req/sec)
///
/// formula for each child:
/// - left: (|parent - current|) / 2 + current
/// - right: current - ((|parent - current|) / 2)
#[derive(Debug)]
struct LimitHeap {
/// backing array, 255 nodes == height of 7 ( 2^(h+1) -1 nodes )
inner: [i32; 255],
/// original # of requests / second
original: i32,
/// current position w/in the backing array
current: usize,
}
/// default implementation of a LimitHeap
impl Default for LimitHeap {
/// zero-initialize the backing array
fn default() -> Self {
Self {
inner: [0; 255],
original: 0,
current: 0,
}
}
}
/// implementation of a LimitHeap
impl LimitHeap {
/// move to right child, return node's index from which the move was requested
fn move_right(&mut self) -> usize {
if self.has_children() {
let tmp = self.current;
self.current = self.current * 2 + 2;
return tmp;
}
self.current
}
/// move to left child, return node's index from which the move was requested
fn move_left(&mut self) -> usize {
if self.has_children() {
let tmp = self.current;
self.current = self.current * 2 + 1;
return tmp;
}
self.current
}
/// move to parent, return node's index from which the move was requested
fn move_up(&mut self) -> usize {
if self.has_parent() {
let tmp = self.current;
self.current = (self.current - 1) / 2;
return tmp;
}
self.current
}
/// move directly to the given index
fn move_to(&mut self, index: usize) {
self.current = index;
}
/// get the current node's value
fn value(&self) -> i32 {
self.inner[self.current]
}
/// set the current node's value
fn set_value(&mut self, value: i32) {
self.inner[self.current] = value;
}
/// check that this node has a parent (true for all except root)
fn has_parent(&self) -> bool {
self.current > 0
}
/// get node's parent's value or self.original if at the root
fn parent_value(&mut self) -> i32 {
if self.has_parent() {
let current = self.move_up();
let val = self.value();
self.move_to(current);
return val;
}
self.original
}
/// check if the current node has children
fn has_children(&self) -> bool {
// inner structure is a complete tree, just check for the right child
self.current * 2 + 2 <= self.inner.len()
}
/// get current node's right child's value
fn right_child_value(&mut self) -> i32 {
let tmp = self.move_right();
let val = self.value();
self.move_to(tmp);
val
}
/// set current node's left child's value
fn set_left_child(&mut self) {
let parent = self.parent_value();
let current = self.value();
let value = ((parent - current).abs() / 2) + current;
self.move_left();
self.set_value(value);
self.move_up();
}
/// set current node's right child's value
fn set_right_child(&mut self) {
let parent = self.parent_value();
let current = self.value();
let value = current - ((parent - current).abs() / 2);
self.move_right();
self.set_value(value);
self.move_up();
}
/// iterate over the backing array, filling in each child's value based on the original value
fn build(&mut self) {
// ex: original is 400
// arr[0] == 200
// arr[1] (left child) == 300
// arr[2] (right child) == 100
let root = self.original / 2;
self.inner[0] = root; // set root node to half of the original value
self.inner[1] = ((self.original - root).abs() / 2) + root;
self.inner[2] = root - ((self.original - root).abs() / 2);
// start with index 1 and fill in each child below that node
for i in 1..self.inner.len() {
self.move_to(i);
if self.has_children() && self.right_child_value() == 0 {
// this node has an unset child since the rchild is 0
self.set_left_child();
self.set_right_child();
}
}
self.move_to(0); // reset current index to the root of the tree
}
}
@@ -100,7 +329,7 @@ pub(super) struct Requester {
target_url: String,
/// limits requests per second if present
rate_limiter: Option<LeakyBucket>,
rate_limiter: RwLock<Option<LeakyBucket>>,
/// data regarding policy and metadata about last enforced trigger etc...
policy_data: PolicyData,
@@ -111,18 +340,9 @@ impl Requester {
/// given a FeroxScanner, create a Requester
pub fn from(scanner: &FeroxScanner) -> Result<Self> {
let limit = scanner.handles.config.rate_limit;
let refill = max(limit / 10, 1); // minimum of 1 per second
let tokens = max(limit / 2, 1);
let interval = if refill == 1 { 1000 } else { 100 }; // 1 second if refill is 1
let rate_limiter = if limit > 0 {
let bucket = LeakyBucket::builder()
.refill_interval(Duration::from_millis(interval)) // add tokens every 0.1s
.refill_amount(refill) // ex: 100 req/s -> 10 tokens per 0.1s
.tokens(tokens) // reduce initial burst, 2 is arbitrary, but felt good
.max(limit)
.build()?;
Some(bucket)
Some(Self::build_a_bucket(limit)?)
} else {
None
};
@@ -131,15 +351,60 @@ impl Requester {
Ok(Self {
policy_data,
rate_limiter,
rate_limiter: RwLock::new(rate_limiter),
handles: scanner.handles.clone(),
target_url: scanner.target_url.to_owned(),
})
}
/// build a LeakyBucket, given a rate limit (as requests per second)
fn build_a_bucket(limit: usize) -> Result<LeakyBucket> {
let refill = max(limit / 10, 1); // minimum of 1 per second
let tokens = max(limit / 2, 1);
let interval = if refill == 1 { 1000 } else { 100 }; // 1 second if refill is 1
Ok(LeakyBucket::builder()
.refill_interval(Duration::from_millis(interval)) // add tokens every 0.1s
.refill_amount(refill) // ex: 100 req/s -> 10 tokens per 0.1s
.tokens(tokens) // reduce initial burst, 2 is arbitrary, but felt good
.max(limit)
.build()?)
}
/// query the statistics handler in order to get the (current) number of requests/second
async fn get_reqs_sec(&self) -> Result<f64> {
let reqs = atomic_load!(self.handles.stats.data.requests) as f64;
let (tx, rx) = oneshot::channel::<f64>();
self.handles.stats.send(GetRuntime(tx))?;
let secs = rx.await?;
Ok(reqs / secs)
}
/// sleep and set a flag that can be checked by other threads
async fn cool_down(&self, wait_time: u64) {
if atomic_load!(self.policy_data.cooling_down) {
// prevents a few racy threads making it in here and doubling the wait time erroneously
return;
}
atomic_store!(self.policy_data.cooling_down, true);
sleep(Duration::from_millis(wait_time)).await;
atomic_store!(self.policy_data.cooling_down, false);
}
/// limit the number of requests per second
pub async fn limit(&self) -> Result<()> {
self.rate_limiter.as_ref().unwrap().acquire_one().await?;
self.rate_limiter
.read()
.await
.as_ref()
.unwrap()
.acquire_one()
.await?;
Ok(())
}
@@ -157,19 +422,19 @@ impl Requester {
.update_usize_field(field, num_errors);
}
/// determine whether or not a policy needs to be enforce
/// determine whether or not a policy needs to be enforced
///
/// criteria:
/// - number of threads (50 default) for general errors (timeouts etc)
/// - 90% of requests are 403
/// - 30% of requests are 429
fn should_enforce_policy(&self) -> Option<PolicyTrigger> {
if self.policy_data.cooling_down.load(Ordering::Relaxed) {
if atomic_load!(self.policy_data.cooling_down) {
// prevents a few racy threads making it in here and doubling the wait time erroneously
return None;
}
let requests = self.handles.stats.data.requests.load(Ordering::Relaxed);
let requests = atomic_load!(self.handles.stats.data.requests);
if requests < max(self.handles.config.threads, 50) {
// check whether at least a full round of threads has made requests or 50 (default # of
@@ -217,8 +482,90 @@ impl Requester {
None
}
/// query the statistics handler for the current number of errors based on the given policy
async fn get_errors_by_policy(&self, trigger: PolicyTrigger) -> Result<usize> {
match (self.policy_data.policy, trigger) {
(RequesterPolicy::AutoBail, PolicyTrigger::Status403) => {
Ok(self.handles.stats.data.status_403s())
}
(RequesterPolicy::AutoBail, PolicyTrigger::Status429) => {
Ok(self.handles.stats.data.status_429s())
}
(RequesterPolicy::AutoBail, PolicyTrigger::Errors) => {
Ok(self.handles.stats.data.errors())
}
(RequesterPolicy::AutoTune, _) => {
// todo unwrap etc
let errors = self
.handles
.ferox_scans()
.unwrap()
.get_scan_by_url(&self.target_url)
.unwrap()
.num_errors(trigger);
Ok(errors)
}
// (RequesterPolicy::AutoTune, PolicyTrigger::Status429) => {
// // todo unwrap etc
// let errors = self.handles.ferox_scans().unwrap().get_scan_by_url(&self.target_url).unwrap().num_errors(trigger);
// Ok(errors)
// }
// (RequesterPolicy::AutoTune, PolicyTrigger::Errors) => {
// // todo unwrap etc
// let errors = self.handles.ferox_scans().unwrap().get_scan_by_url(&self.target_url).unwrap().num_errors(trigger);
// Ok(errors)
// }
// todo autotune error checking isn't quite right, it's checking overall errors then whether
// or not its personal errors are > or ==, which often leads to over-adjusting down (probably)
(RequesterPolicy::Default, _) => Ok(0),
}
}
/// todo doc
async fn adjust_limit(&self, trigger: PolicyTrigger) -> Result<()> {
let errors = self.get_errors_by_policy(trigger).await?;
// log::error!("[ADJUST ({})] {}", errors, self.target_url); // todo remove
if errors > atomic_load!(self.policy_data.errors) {
// errors have increased, need to reduce the requests/sec limit
self.policy_data.adjust_down();
self.policy_data.set_errors(errors);
} else {
// errors can only be incremented, so an else is sufficient
self.policy_data.adjust_up();
}
self.set_rate_limiter().await?;
Ok(())
}
/// lock the rate limiter and set its value to ta new leaky_bucket
async fn set_rate_limiter(&self) -> Result<()> {
let new_bucket = Self::build_a_bucket(self.policy_data.get_limit())?;
let mut guard = self.rate_limiter.write().await;
let _ = std::mem::replace(&mut *guard, Some(new_bucket));
Ok(())
}
/// enforce auto-tune policy
fn tune(&self, _trigger: PolicyTrigger) {}
async fn tune(&self, trigger: PolicyTrigger) -> Result<()> {
if self.rate_limiter.read().await.is_none() {
// set original number of reqs/second the first time tune is called, skip otherwise
let reqs_sec = self.get_reqs_sec().await? as usize;
self.policy_data.set_reqs_sec(reqs_sec);
self.set_rate_limiter().await?;
}
self.adjust_limit(trigger).await?;
self.cool_down(WAIT_TIME).await;
// todo consider setting a 'tune' flag that prevents checking should_enforce, the thought
// being that once it's a yes, it's always a yes and tuning should be a time based thing
Ok(())
}
/// enforce auto-bail policy
async fn bail(&self, trigger: PolicyTrigger) -> Result<()> {
@@ -238,6 +585,7 @@ impl Requester {
if scan_tuples.is_empty() {
return Ok(());
}
// sort by number of errors
scan_tuples.sort_unstable_by(|x, y| y.1.cmp(&x.1));
@@ -249,8 +597,6 @@ impl Requester {
continue;
};
// todo tune should use the backoff strategy
if scan.is_active() {
log::debug!(
"too many {:?} ({}) triggered {:?} Policy on {}",
@@ -267,7 +613,7 @@ impl Requester {
.unwrap_or_else(|e| log::warn!("Could not set scan status: {}", e));
// set cooldown flag before awaiting the abort to reduce chance of races
self.policy_data.backoff(Some(1)).await;
self.cool_down(1500).await;
// kill the scan
scan.abort()
@@ -302,7 +648,9 @@ impl Requester {
FeroxUrl::from_string(&self.target_url, self.handles.clone()).formatted_urls(word)?;
for url in urls {
if self.rate_limiter.is_some() {
let should_limit = self.rate_limiter.read().await.is_some();
if should_limit {
// found a rate limiter, limit that junk!
if let Err(e) = self.limit().await {
log::warn!("Could not rate limit scan: {}", e);
@@ -312,12 +660,20 @@ impl Requester {
let response = logged_request(&url, self.handles.clone()).await?;
if !self.policy_data.cooling_down.load(Ordering::Relaxed) {
if !atomic_load!(self.policy_data.cooling_down) {
// only check for policy enforcement when the trigger isn't on cooldown
match self.policy_data.policy {
RequesterPolicy::AutoTune => {
if let Some(trigger) = self.should_enforce_policy() {
self.tune(trigger);
// todo check for tune flag and short-circuit the enforce call
if atomic_load!(SHOULD_TUNE) {
let trigger = *TUNE_TRIGGER.lock().unwrap();
self.tune(trigger).await?; // todo may or may not be right to bubble up
} else if let Some(trigger) = self.should_enforce_policy() {
if let Ok(mut guard) = TUNE_TRIGGER.lock() {
*guard = trigger;
}
atomic_store!(SHOULD_TUNE, true);
self.tune(trigger).await?; // todo may or may not be right to bubble up
}
}
RequesterPolicy::AutoBail => {
@@ -328,7 +684,7 @@ impl Requester {
RequesterPolicy::Default => {}
}
}
// todo requests/second on scan bar aren't showing
// response came back without error, convert it to FeroxResponse
let ferox_response =
FeroxResponse::from(response, true, self.handles.config.output_level).await;
@@ -469,11 +825,6 @@ mod tests {
handles.stats.sync().await.unwrap();
}
/// helper to stay DRY
fn get_requests(handles: Arc<Handles>) -> usize {
handles.stats.data.requests.load(Ordering::Relaxed)
}
async fn create_scan(
handles: Arc<Handles>,
url: &str,
@@ -532,13 +883,13 @@ mod tests {
let requester = Requester {
handles,
target_url: "http://localhost".to_string(),
rate_limiter: None,
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
};
increment_errors(requester.handles.clone(), 49).await;
// 49 errors is false because we haven't hit the min threshold
assert_eq!(get_requests(requester.handles.clone()), 49);
assert_eq!(atomic_load!(requester.handles.stats.data.requests), 49);
assert_eq!(requester.should_enforce_policy(), None);
}
@@ -553,7 +904,7 @@ mod tests {
let requester = Requester {
handles,
target_url: "http://localhost".to_string(),
rate_limiter: None,
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
};
@@ -574,7 +925,7 @@ mod tests {
let requester = Requester {
handles,
target_url: "http://localhost".to_string(),
rate_limiter: None,
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
};
@@ -598,7 +949,7 @@ mod tests {
let requester = Requester {
handles,
target_url: "http://localhost".to_string(),
rate_limiter: None,
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
};
@@ -637,7 +988,7 @@ mod tests {
let requester = Requester {
handles,
target_url: "http://one/one/stuff.php".to_string(),
rate_limiter: None,
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
};
@@ -668,7 +1019,7 @@ mod tests {
let requester = Requester {
handles,
target_url: "http://one/one/stuff.php".to_string(),
rate_limiter: None,
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
};
@@ -687,7 +1038,7 @@ mod tests {
let requester = Requester {
handles,
target_url: "http://localhost".to_string(),
rate_limiter: None,
rate_limiter: RwLock::new(None),
policy_data: Default::default(),
};

View File

@@ -21,3 +21,11 @@ macro_rules! atomic_load {
$metric.load(Ordering::Relaxed);
};
}
/// Wrapper around `Atomic*.store` to save me from writing Ordering::Relaxed a bajillion times
#[macro_export]
macro_rules! atomic_store {
($metric:expr, $value:expr) => {
$metric.store($value, Ordering::Relaxed);
};
}