restructured statistics; created event_handlers module

This commit is contained in:
epi
2021-01-13 07:27:37 -06:00
parent e867898a31
commit 6b05fba068
11 changed files with 338 additions and 276 deletions

View File

@@ -0,0 +1,2 @@
mod statistics;
pub use statistics::StatsHandler;

View File

@@ -0,0 +1,112 @@
use crate::{
atomic_load,
config::CONFIGURATION,
progress::{add_bar, BarType},
statistics::{StatCommand, StatField, Stats},
};
use console::style;
use indicatif::ProgressBar;
use std::{
sync::{atomic::Ordering, Arc},
time::Instant,
};
use tokio::sync::mpsc::UnboundedReceiver;
/// event handler struct for updating statistics
#[derive(Debug)]
pub struct StatsHandler {
/// overall scan's progress bar
bar: ProgressBar,
/// Receiver half of mpsc from which `StatCommand`s are processed
receiver: UnboundedReceiver<StatCommand>,
/// data class that stores all statistics updates
stats: Arc<Stats>,
}
/// implementation of event handler for statistics
impl StatsHandler {
/// create new event handler builder
pub fn new(stats: Arc<Stats>, rx_stats: UnboundedReceiver<StatCommand>) -> Self {
// will be updated later via StatCommand; delay is for banner to print first
let bar = ProgressBar::hidden();
Self {
bar,
stats,
receiver: rx_stats,
}
}
/// Start a single consumer task (sc side of mpsc)
///
/// The consumer simply receives `StatCommands` and updates the given `Stats` object as appropriate
pub async fn start(&mut self) {
log::trace!("enter: statistics::start({:?})", self);
let start = Instant::now();
while let Some(command) = self.receiver.recv().await {
match command as StatCommand {
StatCommand::AddError(err) => {
self.stats.add_error(err);
self.increment_bar();
}
StatCommand::AddStatus(status) => {
self.stats.add_status_code(status);
self.increment_bar();
}
StatCommand::AddRequest => {
self.stats.add_request();
self.increment_bar();
}
StatCommand::Save => self
.stats
.save(start.elapsed().as_secs_f64(), &CONFIGURATION.output),
StatCommand::UpdateUsizeField(field, value) => {
let update_len = matches!(field, StatField::TotalScans);
self.stats.update_usize_field(field, value);
if update_len {
self.bar
.set_length(atomic_load!(self.stats.total_expected) as u64)
}
}
StatCommand::UpdateF64Field(field, value) => {
self.stats.update_f64_field(field, value)
}
StatCommand::CreateBar => {
self.bar = add_bar(
"",
atomic_load!(self.stats.total_expected) as u64,
BarType::Total,
);
}
StatCommand::LoadStats(filename) => {
self.stats.merge_from(&filename);
}
StatCommand::Exit => break,
}
}
self.bar.finish();
log::debug!("{:#?}", *self.stats);
log::trace!("exit: statistics::start")
}
/// Wrapper around incrementing the overall scan's progress bar
fn increment_bar(&self) {
let msg = format!(
"{}:{:<7} {}:{:<7}",
style("found").green(),
atomic_load!(self.stats.resources_discovered),
style("errors").red(),
atomic_load!(self.stats.errors),
);
self.bar.set_message(&msg);
self.bar.inc(1);
}
}

View File

@@ -12,6 +12,7 @@ pub mod reporter;
pub mod scan_manager;
pub mod scanner;
pub mod statistics;
mod event_handlers;
use crate::utils::{get_url_path_length, status_colorizer};
use console::{style, Color};

33
src/statistics/command.rs Normal file
View File

@@ -0,0 +1,33 @@
use super::{error::StatError, field::StatField};
use reqwest::StatusCode;
/// Protocol definition for updating a Stats object via mpsc
#[derive(Debug)]
pub enum StatCommand {
/// Add one to the total number of requests
AddRequest,
/// Add one to the proper field(s) based on the given `StatError`
AddError(StatError),
/// Add one to the proper field(s) based on the given `StatusCode`
AddStatus(StatusCode),
/// Create the progress bar (`BarType::Total`) that is updated from the stats thread
CreateBar,
/// Update a `Stats` field that corresponds to the given `StatField` by the given `usize` value
UpdateUsizeField(StatField, usize),
/// Update a `Stats` field that corresponds to the given `StatField` by the given `f64` value
UpdateF64Field(StatField, f64),
/// Save a `Stats` object to disk using `reporter::get_cached_file_handle`
Save,
/// Load a `Stats` object from disk
LoadStats(String),
/// Break out of the (infinite) mpsc receive loop
Exit,
}

View File

@@ -1,11 +1,9 @@
use super::{error::StatError, field::StatField};
use crate::{
config::CONFIGURATION,
progress::{add_bar, BarType},
reporter::{get_cached_file_handle, safe_file_write},
FeroxChannel, FeroxSerialize,
FeroxSerialize,
};
use console::style;
use indicatif::ProgressBar;
use reqwest::StatusCode;
use serde::{Deserialize, Serialize};
use std::{
@@ -13,34 +11,9 @@ use std::{
io::BufReader,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
Mutex,
},
time::Instant,
};
use tokio::{
sync::mpsc::{self, UnboundedReceiver, UnboundedSender},
task::JoinHandle,
};
/// Wrapper `Atomic*.fetch_add` to save me from writing Ordering::Relaxed a bajillion times
///
/// default is to increment by 1, second arg can be used to increment by a different value
macro_rules! atomic_increment {
($metric:expr) => {
$metric.fetch_add(1, Ordering::Relaxed);
};
($metric:expr, $value:expr) => {
$metric.fetch_add($value, Ordering::Relaxed);
};
}
/// Wrapper around `Atomic*.load` to save me from writing Ordering::Relaxed a bajillion times
macro_rules! atomic_load {
($metric:expr) => {
$metric.load(Ordering::Relaxed);
};
}
/// Data collection of statistics related to a scan
#[derive(Default, Deserialize, Debug, Serialize)]
@@ -63,10 +36,10 @@ pub struct Stats {
/// tracker for accumulating total number of requests expected (i.e. as a new scan is started
/// this value should increase by `expected_requests`
total_expected: AtomicUsize,
pub total_expected: AtomicUsize,
/// tracker for total number of errors encountered by the client
errors: AtomicUsize,
pub errors: AtomicUsize,
/// tracker for overall number of 2xx status codes seen by the client
successes: AtomicUsize,
@@ -128,7 +101,7 @@ pub struct Stats {
responses_filtered: AtomicUsize,
/// tracker for number of files found
resources_discovered: AtomicUsize,
pub resources_discovered: AtomicUsize,
/// tracker for number of errors triggered during URL formatting
url_format_errors: AtomicUsize,
@@ -176,7 +149,7 @@ impl Stats {
}
/// increment `requests` field by one
fn add_request(&self) {
pub fn add_request(&self) {
atomic_increment!(self.requests);
}
@@ -188,7 +161,7 @@ impl Stats {
}
/// save an instance of `Stats` to disk after updating the total runtime for the scan
fn save(&self, seconds: f64, location: &str) {
pub fn save(&self, seconds: f64, location: &str) {
let buffered_file = match get_cached_file_handle(location) {
Some(file) => file,
None => {
@@ -242,7 +215,7 @@ impl Stats {
/// - requests
/// - status_403s (when code is 403)
/// - errors (when code is [45]xx)
fn add_status_code(&self, status: StatusCode) {
pub fn add_status_code(&self, status: StatusCode) {
self.add_request();
if status.is_success() {
@@ -291,7 +264,7 @@ impl Stats {
}
/// Update a `Stats` field of type f64
fn update_f64_field(&self, field: StatField, value: f64) {
pub fn update_f64_field(&self, field: StatField, value: f64) {
if let StatField::DirScanTimes = field {
if let Ok(mut locked_times) = self.directory_scan_times.lock() {
locked_times.push(value);
@@ -300,7 +273,7 @@ impl Stats {
}
/// Update a `Stats` field of type usize
fn update_usize_field(&self, field: StatField, value: usize) {
pub fn update_usize_field(&self, field: StatField, value: usize) {
match field {
StatField::ExpectedPerScan => {
atomic_increment!(self.expected_per_scan, value);
@@ -402,228 +375,13 @@ impl Stats {
}
}
#[derive(Debug)]
/// Enum variants used to inform the `StatCommand` protocol what `Stats` fields should be updated
pub enum StatError {
/// Represents a 403 response code
Status403,
/// Represents a timeout error
Timeout,
/// Represents a URL formatting error
UrlFormat,
/// Represents an error encountered during redirection
Redirection,
/// Represents an error encountered during connection
Connection,
/// Represents an error resulting from the client's request
Request,
/// Represents any other error not explicitly defined above
Other,
}
/// Protocol definition for updating a Stats object via mpsc
#[derive(Debug)]
pub enum StatCommand {
/// Add one to the total number of requests
AddRequest,
/// Add one to the proper field(s) based on the given `StatError`
AddError(StatError),
/// Add one to the proper field(s) based on the given `StatusCode`
AddStatus(StatusCode),
/// Create the progress bar (`BarType::Total`) that is updated from the stats thread
CreateBar,
/// Update a `Stats` field that corresponds to the given `StatField` by the given `usize` value
UpdateUsizeField(StatField, usize),
/// Update a `Stats` field that corresponds to the given `StatField` by the given `f64` value
UpdateF64Field(StatField, f64),
/// Save a `Stats` object to disk using `reporter::get_cached_file_handle`
Save,
/// Load a `Stats` object from disk
LoadStats(String),
/// Break out of the (infinite) mpsc receive loop
Exit,
}
/// Enum representing fields whose updates need to be performed in batches instead of one at
/// a time
#[derive(Debug)]
pub enum StatField {
/// Due to the necessary order of events, the number of requests expected to be sent isn't
/// known until after `statistics::initialize` is called. This command allows for updating
/// the `expected_per_scan` field after initialization
ExpectedPerScan,
/// Translates to `total_scans`
TotalScans,
/// Translates to `links_extracted`
LinksExtracted,
/// Translates to `total_expected`
TotalExpected,
/// Translates to `wildcards_filtered`
WildcardsFiltered,
/// Translates to `responses_filtered`
ResponsesFiltered,
/// Translates to `resources_discovered`
ResourcesDiscovered,
/// Translates to `initial_targets`
InitialTargets,
/// Translates to `directory_scan_times`; assumes a single append to the vector
DirScanTimes,
}
/// Spawn a single consumer task (sc side of mpsc)
///
/// The consumer simply receives `StatCommands` and updates the given `Stats` object as appropriate
pub async fn spawn_statistics_handler(
mut rx_stats: UnboundedReceiver<StatCommand>,
stats: Arc<Stats>,
tx_stats: UnboundedSender<StatCommand>,
) {
log::trace!(
"enter: spawn_statistics_handler({:?}, {:?}, {:?})",
rx_stats,
stats,
tx_stats
);
// will be updated later via StatCommand; delay is for banner to print first
let mut bar = ProgressBar::hidden();
let start = Instant::now();
while let Some(command) = rx_stats.recv().await {
match command as StatCommand {
StatCommand::AddError(err) => {
stats.add_error(err);
increment_bar(&bar, stats.clone());
}
StatCommand::AddStatus(status) => {
stats.add_status_code(status);
increment_bar(&bar, stats.clone());
}
StatCommand::AddRequest => {
stats.add_request();
increment_bar(&bar, stats.clone());
}
StatCommand::Save => stats.save(start.elapsed().as_secs_f64(), &CONFIGURATION.output),
StatCommand::UpdateUsizeField(field, value) => {
let update_len = matches!(field, StatField::TotalScans);
stats.update_usize_field(field, value);
if update_len {
bar.set_length(atomic_load!(stats.total_expected) as u64)
}
}
StatCommand::UpdateF64Field(field, value) => stats.update_f64_field(field, value),
StatCommand::CreateBar => {
bar = add_bar(
"",
atomic_load!(stats.total_expected) as u64,
BarType::Total,
);
}
StatCommand::LoadStats(filename) => {
stats.merge_from(&filename);
}
StatCommand::Exit => break,
}
}
bar.finish();
log::debug!("{:#?}", *stats);
log::trace!("exit: spawn_statistics_handler")
}
/// Wrapper around incrementing the overall scan's progress bar
fn increment_bar(bar: &ProgressBar, stats: Arc<Stats>) {
let msg = format!(
"{}:{:<7} {}:{:<7}",
style("found").green(),
atomic_load!(stats.resources_discovered),
style("errors").red(),
atomic_load!(stats.errors),
);
bar.set_message(&msg);
bar.inc(1);
}
/// Initialize new `Stats` object and the sc side of an mpsc channel that is responsible for
/// updates to the aforementioned object.
pub fn initialize() -> (Arc<Stats>, UnboundedSender<StatCommand>, JoinHandle<()>) {
log::trace!("enter: initialize");
let stats_tracker = Arc::new(Stats::new());
let stats_cloned = stats_tracker.clone();
let (tx_stats, rx_stats): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let tx_stats_cloned = tx_stats.clone();
let stats_thread = tokio::spawn(async move {
spawn_statistics_handler(rx_stats, stats_cloned, tx_stats_cloned).await
});
log::trace!(
"exit: initialize -> ({:?}, {:?}, {:?})",
stats_tracker,
tx_stats,
stats_thread
);
(stats_tracker, tx_stats, stats_thread)
}
#[cfg(test)]
mod tests {
use super::super::*;
use super::*;
use std::fs::write;
use tempfile::NamedTempFile;
/// simple helper to reduce code reuse
fn setup_stats_test() -> (Arc<Stats>, UnboundedSender<StatCommand>, JoinHandle<()>) {
initialize()
}
/// another helper to stay DRY; must be called after any sent commands and before any checks
/// performed against the Stats object
async fn teardown_stats_test(sender: UnboundedSender<StatCommand>, handle: JoinHandle<()>) {
// send exit and await, once the await completes, stats should be updated
sender.send(StatCommand::Exit).unwrap_or_default();
handle.await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
/// when sent StatCommand::Exit, function should exit its while loop (runs forever otherwise)
async fn statistics_handler_exits() {
let (_, sender, handle) = setup_stats_test();
sender.send(StatCommand::Exit).unwrap_or_default();
handle.await.unwrap(); // blocks on the handler's while loop
// if we've made it here, the test has succeeded
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
/// when sent StatCommand::AddRequest, stats object should reflect the change
async fn statistics_handler_increments_requests() {
@@ -808,26 +566,4 @@ mod tests {
stats.update_runtime(20.2);
assert!((stats.total_runtime.lock().unwrap()[0] - 20.2).abs() < f64::EPSILON);
}
#[test]
/// Stats::save should write contents of Stats to disk
fn save_writes_stats_object_to_disk() {
let stats = Stats::new();
stats.add_request();
stats.add_request();
stats.add_request();
stats.add_request();
stats.add_error(StatError::Timeout);
stats.add_error(StatError::Timeout);
stats.add_error(StatError::Timeout);
stats.add_error(StatError::Timeout);
stats.add_status_code(StatusCode::OK);
stats.add_status_code(StatusCode::OK);
stats.add_status_code(StatusCode::OK);
let outfile = "/tmp/stuff";
stats.save(174.33, outfile);
assert!(stats.as_json().contains("statistics"));
assert!(stats.as_json().contains("11")); // requests made
assert!(stats.as_str().is_empty());
}
}

24
src/statistics/error.rs Normal file
View File

@@ -0,0 +1,24 @@
#[derive(Debug)]
/// Enum variants used to inform the `StatCommand` protocol what `Stats` fields should be updated
pub enum StatError {
/// Represents a 403 response code
Status403,
/// Represents a timeout error
Timeout,
/// Represents a URL formatting error
UrlFormat,
/// Represents an error encountered during redirection
Redirection,
/// Represents an error encountered during connection
Connection,
/// Represents an error resulting from the client's request
Request,
/// Represents any other error not explicitly defined above
Other,
}

33
src/statistics/field.rs Normal file
View File

@@ -0,0 +1,33 @@
/// Enum representing fields whose updates need to be performed in batches instead of one at
/// a time
#[derive(Debug)]
pub enum StatField {
/// Due to the necessary order of events, the number of requests expected to be sent isn't
/// known until after `statistics::initialize` is called. This command allows for updating
/// the `expected_per_scan` field after initialization
ExpectedPerScan,
/// Translates to `total_scans`
TotalScans,
/// Translates to `links_extracted`
LinksExtracted,
/// Translates to `total_expected`
TotalExpected,
/// Translates to `wildcards_filtered`
WildcardsFiltered,
/// Translates to `responses_filtered`
ResponsesFiltered,
/// Translates to `resources_discovered`
ResourcesDiscovered,
/// Translates to `initial_targets`
InitialTargets,
/// Translates to `directory_scan_times`; assumes a single append to the vector
DirScanTimes,
}

29
src/statistics/init.rs Normal file
View File

@@ -0,0 +1,29 @@
use super::{command::StatCommand, data::Stats};
use crate::{event_handlers::StatsHandler, FeroxChannel};
use std::sync::Arc;
use tokio::{
sync::mpsc::{self, UnboundedSender},
task::JoinHandle,
};
/// Initialize new `Stats` object and the sc side of an mpsc channel that is responsible for
/// updates to the aforementioned object.
pub fn initialize() -> (Arc<Stats>, UnboundedSender<StatCommand>, JoinHandle<()>) {
log::trace!("enter: initialize");
let stats_tracker = Arc::new(Stats::new());
let (tx_stats, rx_stats): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let mut handler = StatsHandler::new(stats_tracker.clone(), rx_stats);
let stats_thread = tokio::spawn(async move { handler.start().await });
log::trace!(
"exit: initialize -> ({:?}, {:?}, {:?})",
stats_tracker,
tx_stats,
stats_thread
);
(stats_tracker, tx_stats, stats_thread)
}

23
src/statistics/macros.rs Normal file
View File

@@ -0,0 +1,23 @@
#![macro_use]
/// Wrapper `Atomic*.fetch_add` to save me from writing Ordering::Relaxed a bajillion times
///
/// default is to increment by 1, second arg can be used to increment by a different value
#[macro_export]
macro_rules! atomic_increment {
($metric:expr) => {
$metric.fetch_add(1, Ordering::Relaxed);
};
($metric:expr, $value:expr) => {
$metric.fetch_add($value, Ordering::Relaxed);
};
}
/// Wrapper around `Atomic*.load` to save me from writing Ordering::Relaxed a bajillion times
#[macro_export]
macro_rules! atomic_load {
($metric:expr) => {
$metric.load(Ordering::Relaxed);
};
}

17
src/statistics/mod.rs Normal file
View File

@@ -0,0 +1,17 @@
mod error;
mod macros;
mod data;
mod command;
mod field;
mod init;
#[cfg(test)]
mod tests;
pub use self::command::StatCommand;
pub use self::data::Stats;
pub use self::error::StatError;
pub use self::field::StatField;
pub use self::init::initialize;
#[cfg(test)]
use self::tests::{setup_stats_test, teardown_stats_test};

52
src/statistics/tests.rs Normal file
View File

@@ -0,0 +1,52 @@
use super::*;
use crate::FeroxSerialize;
use reqwest::StatusCode;
use std::sync::Arc;
use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
/// simple helper to reduce code reuse
pub fn setup_stats_test() -> (Arc<Stats>, UnboundedSender<StatCommand>, JoinHandle<()>) {
initialize()
}
/// another helper to stay DRY; must be called after any sent commands and before any checks
/// performed against the Stats object
pub async fn teardown_stats_test(sender: UnboundedSender<StatCommand>, handle: JoinHandle<()>) {
// send exit and await, once the await completes, stats should be updated
sender.send(StatCommand::Exit).unwrap_or_default();
handle.await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
/// when sent StatCommand::Exit, function should exit its while loop (runs forever otherwise)
async fn statistics_handler_exits() {
let (_, sender, handle) = setup_stats_test();
sender.send(StatCommand::Exit).unwrap_or_default();
handle.await.unwrap(); // blocks on the handler's while loop
// if we've made it here, the test has succeeded
}
#[test]
/// Stats::save should write contents of Stats to disk
fn save_writes_stats_object_to_disk() {
let stats = Stats::new();
stats.add_request();
stats.add_request();
stats.add_request();
stats.add_request();
stats.add_error(StatError::Timeout);
stats.add_error(StatError::Timeout);
stats.add_error(StatError::Timeout);
stats.add_error(StatError::Timeout);
stats.add_status_code(StatusCode::OK);
stats.add_status_code(StatusCode::OK);
stats.add_status_code(StatusCode::OK);
let outfile = "/tmp/stuff";
stats.save(174.33, outfile);
assert!(stats.as_json().contains("statistics"));
assert!(stats.as_json().contains("11")); // requests made
assert!(stats.as_str().is_empty());
}