diff --git a/src/scanner/utils.rs b/src/scanner/utils.rs index f16293d..9866fb8 100644 --- a/src/scanner/utils.rs +++ b/src/scanner/utils.rs @@ -471,7 +471,7 @@ impl Requester { } /// wrapper for adjust_[up,down] functions, checks error levels to determine adjustment direction - async fn adjust_limit(&self, trigger: PolicyTrigger) -> Result<()> { + async fn adjust_limit(&self, trigger: PolicyTrigger, create_limiter: bool) -> Result<()> { let scan_errors = self.ferox_scan.num_errors(trigger); let policy_errors = atomic_load!(self.policy_data.errors, Ordering::SeqCst); @@ -493,7 +493,9 @@ impl Requester { if atomic_load!(self.policy_data.remove_limit) { self.set_rate_limiter(None).await?; atomic_store!(self.policy_data.remove_limit, false); - } else { + } else if create_limiter { + // create_limiter is really just used for unit testing situations, it's true anytime + // during actual execution let new_limit = self.policy_data.get_limit(); // limit is set from within the lock self.set_rate_limiter(Some(new_limit)).await?; } @@ -533,7 +535,7 @@ impl Requester { self.set_rate_limiter(Some(new_limit)).await?; } - self.adjust_limit(trigger).await?; + self.adjust_limit(trigger, true).await?; self.cool_down().await; Ok(()) @@ -733,11 +735,7 @@ mod tests { let scans = handles.ferox_scans().unwrap(); for _ in 0..num_errors { - if !url.ends_with('/') { - scans.increment_error(format!("{}/", url).as_str()); - } else { - scans.increment_error(url); - }; + scans.increment_error(format!("{}/", url).as_str()); } } @@ -750,11 +748,7 @@ mod tests { ) { let scans = handles.ferox_scans().unwrap(); for _ in 0..num_errors { - if !url.ends_with('/') { - scans.increment_status_code(format!("{}/", url).as_str(), code); - } else { - scans.increment_status_code(url, code); - }; + scans.increment_status_code(format!("{}/", url).as_str(), code); } } @@ -1217,10 +1211,13 @@ mod tests { fn increase_limit_heap_coverage_by_hitting_edge_cases() { let pd = PolicyData::new(RequesterPolicy::AutoBail, 7); pd.set_reqs_sec(400); + println!("{:?}", pd.heap.read().unwrap()); // debug derivation + pd.heap.write().unwrap().move_to(240); assert_eq!(pd.heap.write().unwrap().move_right(), 240); assert_eq!(pd.heap.write().unwrap().move_left(), 240); + pd.heap.write().unwrap().move_to(0); assert_eq!(pd.heap.write().unwrap().move_up(), 0); assert_eq!(pd.heap.write().unwrap().parent_value(), 400); @@ -1270,7 +1267,10 @@ mod tests { }; requester.policy_data.set_reqs_sec(400); - requester.adjust_limit(PolicyTrigger::Errors).await.unwrap(); + requester + .adjust_limit(PolicyTrigger::Errors, true) + .await + .unwrap(); assert_eq!(*requester.tuning_lock.lock().unwrap(), 1); assert_eq!(requester.policy_data.get_limit(), 300); @@ -1285,6 +1285,10 @@ mod tests { /// decrease the scan rate async fn adjust_limit_resets_streak_counter_on_downward_movement() { let (handles, _) = setup_requester_test(None).await; + let mut buckets = leaky_bucket::LeakyBuckets::new(); + let coordinator = buckets.coordinate().unwrap(); + tokio::spawn(async move { coordinator.await.expect("coordinator errored") }); + let limiter = buckets.rate_limiter().max(200).build().unwrap(); let scan = FeroxScan::default(); scan.add_error(); @@ -1295,7 +1299,7 @@ mod tests { tuning_lock: Mutex::new(0), ferox_scan: Arc::new(scan), target_url: "http://localhost".to_string(), - rate_limiter: RwLock::new(None), + rate_limiter: RwLock::new(Some(limiter)), policy_data: PolicyData::new(RequesterPolicy::AutoBail, 7), }; @@ -1306,14 +1310,14 @@ mod tests { *guard = 2; drop(guard); - requester.adjust_limit(PolicyTrigger::Errors).await.unwrap(); + requester + .adjust_limit(PolicyTrigger::Errors, false) + .await + .unwrap(); assert_eq!(*requester.tuning_lock.lock().unwrap(), 0); assert_eq!(requester.policy_data.get_limit(), 100); - assert_eq!( - requester.rate_limiter.read().await.as_ref().unwrap().max(), - 100 - ); + assert_eq!(requester.policy_data.errors.load(Ordering::Relaxed), 2); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] @@ -1340,7 +1344,145 @@ mod tests { .remove_limit .store(true, Ordering::Relaxed); - requester.adjust_limit(PolicyTrigger::Errors).await.unwrap(); + requester + .adjust_limit(PolicyTrigger::Errors, true) + .await + .unwrap(); assert!(requester.rate_limiter.read().await.is_none()); } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + /// errors policytrigger should always be false, 403 is high ratio, and 429 is high ratio / 3 + async fn too_many_status_errors_returns_correct_values() { + let (handles, _) = setup_requester_test(None).await; + + let mut requester = Requester { + handles, + tuning_lock: Mutex::new(0), + ferox_scan: Arc::new(FeroxScan::default()), + target_url: "http://localhost".to_string(), + rate_limiter: RwLock::new(None), + policy_data: PolicyData::new(RequesterPolicy::AutoBail, 7), + }; + + assert_eq!( + requester.too_many_status_errors(PolicyTrigger::Errors), + false + ); + + assert_eq!( + requester.too_many_status_errors(PolicyTrigger::Status429), + false + ); + requester.ferox_scan.progress_bar().set_position(10); + requester.ferox_scan.add_429(); + requester.ferox_scan.add_429(); + requester.ferox_scan.add_429(); + assert_eq!( + requester.too_many_status_errors(PolicyTrigger::Status429), + true + ); + + assert_eq!( + requester.too_many_status_errors(PolicyTrigger::Status403), + false + ); + requester.ferox_scan = Arc::new(FeroxScan::default()); + requester.ferox_scan.progress_bar().set_position(10); + requester.ferox_scan.add_403(); + requester.ferox_scan.add_403(); + requester.ferox_scan.add_403(); + requester.ferox_scan.add_403(); + requester.ferox_scan.add_403(); + requester.ferox_scan.add_403(); + requester.ferox_scan.add_403(); + requester.ferox_scan.add_403(); + requester.ferox_scan.add_403(); + assert_eq!( + requester.too_many_status_errors(PolicyTrigger::Status403), + true + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + /// set_rate_limiter should exit early when new limit equals the current bucket's max + async fn set_rate_limiter_early_exit() { + let (handles, _) = setup_requester_test(None).await; + let mut buckets = leaky_bucket::LeakyBuckets::new(); + let coordinator = buckets.coordinate().unwrap(); + tokio::spawn(async move { coordinator.await.expect("coordinator errored") }); + let limiter = buckets.rate_limiter().max(200).build().unwrap(); + + let requester = Requester { + handles, + tuning_lock: Mutex::new(0), + ferox_scan: Arc::new(FeroxScan::default()), + target_url: "http://localhost".to_string(), + rate_limiter: RwLock::new(Some(limiter)), + policy_data: PolicyData::new(RequesterPolicy::AutoBail, 7), + }; + + requester.set_rate_limiter(Some(200)).await.unwrap(); + assert_eq!( + requester.rate_limiter.read().await.as_ref().unwrap().max(), + 200 + ); + requester.set_rate_limiter(Some(200)).await.unwrap(); + assert_eq!( + requester.rate_limiter.read().await.as_ref().unwrap().max(), + 200 + ); + } + + #[tokio::test(flavor = "multi_thread", worker_threads = 1)] + /// tune should set req/sec and rate_limiter, adjust the limit and cooldown + async fn tune_sets_expected_values_and_then_waits() { + let (handles, _) = setup_requester_test(None).await; + + let mut buckets = leaky_bucket::LeakyBuckets::new(); + let coordinator = buckets.coordinate().unwrap(); + tokio::spawn(async move { coordinator.await.expect("coordinator errored") }); + let limiter = buckets.rate_limiter().max(200).build().unwrap(); + + let scan = FeroxScan::new( + "http://localhost", + ScanType::Directory, + ScanOrder::Initial, + 1000, + OutputLevel::Default, + None, + ); + scan.set_status(ScanStatus::Running).unwrap(); + scan.add_429(); + + let requester = Requester { + handles, + tuning_lock: Mutex::new(0), + ferox_scan: scan.clone(), + target_url: "http://localhost".to_string(), + rate_limiter: RwLock::new(Some(limiter)), + policy_data: PolicyData::new(RequesterPolicy::AutoTune, 4), + }; + + let start = Instant::now(); + + let pb = scan.progress_bar(); + pb.set_length(1000); + pb.set_position(400); + sleep(Duration::new(1, 0)).await; // used to get req/sec up to 400 + + assert_eq!(requester.policy_data.errors.load(Ordering::Relaxed), 0); + + requester.tune(PolicyTrigger::Status429).await.unwrap(); + + assert_eq!(requester.policy_data.heap.read().unwrap().original, 400); + assert_eq!(requester.policy_data.get_limit(), 200); + assert_eq!( + requester.rate_limiter.read().await.as_ref().unwrap().max(), + 200 + ); + + scan.finish().unwrap(); + assert!(start.elapsed().as_millis() >= 2000); + } }