diff --git a/src/banner/container.rs b/src/banner/container.rs index a2767de..3de45e0 100644 --- a/src/banner/container.rs +++ b/src/banner/container.rs @@ -1,7 +1,7 @@ use super::entry::BannerEntry; use crate::{ config::Configuration, - statistics::StatCommand, + event_handlers::Command, utils::{make_request, status_colorizer}, VERSION, }; @@ -348,7 +348,7 @@ by Ben "epi" Risher {} ver: {}"#, &mut self, client: &Client, url: &str, - tx_stats: UnboundedSender, + tx_stats: UnboundedSender, ) -> Result<()> { log::trace!("enter: needs_update({:?}, {}, {:?})", client, url, tx_stats); diff --git a/src/banner/tests.rs b/src/banner/tests.rs index 5a9102e..ce01a48 100644 --- a/src/banner/tests.rs +++ b/src/banner/tests.rs @@ -2,7 +2,7 @@ use super::container::UpdateStatus; use super::*; use crate::{ config::{Configuration, CONFIGURATION}, - statistics::StatCommand, + statistics::Command, FeroxChannel, }; use httpmock::Method::GET; @@ -58,7 +58,7 @@ async fn banner_intialize_without_queries() { #[tokio::test(flavor = "multi_thread", worker_threads = 1)] /// test that async fn banner_needs_update_returns_unknown_with_bad_url() { - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); let mut banner = Banner::new(&[String::from("http://localhost")], &CONFIGURATION); @@ -79,7 +79,7 @@ async fn banner_needs_update_returns_up_to_date() { then.status(200).body("{\"tag_name\":\"v1.1.0\"}"); }); - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); let mut banner = Banner::new(&[srv.url("")], &CONFIGURATION); banner.version = String::from("1.1.0"); @@ -102,7 +102,7 @@ async fn banner_needs_update_returns_out_of_date() { then.status(200).body("{\"tag_name\":\"v1.1.0\"}"); }); - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); let mut banner = Banner::new(&[srv.url("")], &CONFIGURATION); banner.version = String::from("1.0.1"); @@ -127,7 +127,7 @@ async fn banner_needs_update_returns_unknown_on_timeout() { .delay(Duration::from_secs(8)); }); - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); let mut banner = Banner::new(&[srv.url("")], &CONFIGURATION); @@ -149,7 +149,7 @@ async fn banner_needs_update_returns_unknown_on_bad_json_response() { then.status(200).body("not json"); }); - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); let mut banner = Banner::new(&[srv.url("")], &CONFIGURATION); @@ -172,7 +172,7 @@ async fn banner_needs_update_returns_unknown_on_json_without_correct_tag() { .body("{\"no tag_name\": \"doesn't exist\"}"); }); - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); let mut banner = Banner::new(&[srv.url("")], &CONFIGURATION); banner.version = String::from("1.0.1"); diff --git a/src/event_handlers/builder.rs b/src/event_handlers/builder.rs new file mode 100644 index 0000000..921d59a --- /dev/null +++ b/src/event_handlers/builder.rs @@ -0,0 +1,41 @@ +// use crate::event_handlers::Handlers; +// use crate::filters::FilterCommand; +// use crate::statistics::StatCommand; +// use crate::traits::HandlerCommand; +// use tokio::sync::mpsc::UnboundedSender; +// +// /// todo +// #[derive(Default)] +// pub struct HandlersBuilder { +// transmitters: Vec>, +// } +// +// /// todo +// impl HandlersBuilder { +// /// todo +// pub fn new() -> Self { +// Self { +// ..Default::default() +// } +// } +// +// /// todo +// pub fn transmitter(&mut self, transmitter: UnboundedSender) -> &mut Self { +// self.transmitters.push(transmitter); +// self +// } +// +// /// todo +// pub fn build(&self) -> Handlers { +// // let tx_stats: UnboundedSender = self.transmitters[0]; +// // let tx_filters: UnboundedSender = self.transmitters[1]; +// +// for tx in self.transmitters {} +// +// Handlers { +// handles: Vec::new(), +// tx_stats, +// tx_filters, +// } +// } +// } diff --git a/src/statistics/command.rs b/src/event_handlers/command.rs similarity index 76% rename from src/statistics/command.rs rename to src/event_handlers/command.rs index 0d6ac1d..5bcd22a 100644 --- a/src/statistics/command.rs +++ b/src/event_handlers/command.rs @@ -1,9 +1,10 @@ -use super::{error::StatError, field::StatField}; +use crate::statistics::{StatError, StatField}; +use crate::traits::FeroxFilter; use reqwest::StatusCode; -/// Protocol definition for updating a Stats object via mpsc +/// Protocol definition for updating an event handler via mpsc #[derive(Debug)] -pub enum StatCommand { +pub enum Command { /// Add one to the total number of requests AddRequest, @@ -28,6 +29,9 @@ pub enum StatCommand { /// Load a `Stats` object from disk LoadStats(String), + /// Add a FeroxFilter implementor to FilterHandler's instance of FeroxFilters + AddFilter(Box), + /// Break out of the (infinite) mpsc receive loop Exit, } diff --git a/src/event_handlers/container.rs b/src/event_handlers/container.rs new file mode 100644 index 0000000..0a0fbe5 --- /dev/null +++ b/src/event_handlers/container.rs @@ -0,0 +1,151 @@ +use super::Command; +use crate::{filters::FeroxFilters, statistics::Stats, CommandSender}; +use anyhow::Result; +use std::sync::Arc; +use tokio::task::JoinHandle; + +type Joiner = JoinHandle>; + +pub struct Tasks { + pub terminal: Joiner, + pub file: Option, + pub stats: Joiner, + pub filters: Joiner, +} + +impl Tasks { + pub fn new(terminal: Joiner, file: Option, stats: Joiner, filters: Joiner) -> Self { + Self { + terminal, + file, + stats, + filters, + } + } +} + +#[derive(Clone, Debug)] +pub struct StatsHandle { + pub data: Arc, + pub tx: CommandSender, +} + +impl StatsHandle { + pub fn new(data: Arc, tx: CommandSender) -> Self { + Self { data, tx } + } + pub fn send(&self, command: Command) -> Result<()> { + self.tx.send(command)?; + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub struct FiltersHandle { + pub data: Arc, + pub tx: CommandSender, +} + +impl FiltersHandle { + pub fn new(data: Arc, tx: CommandSender) -> Self { + Self { data, tx } + } + + pub fn send(&self, command: Command) -> Result<()> { + self.tx.send(command)?; + Ok(()) + } +} + +// #[derive(Clone, Debug)] +// pub struct ReporterTerminalHandle { +// pub data: Arc, +// pub task: JoinHandle>, +// pub transmitter: CommandSender, +// } +// +// impl StatsHandle { +// pub fn new(data: Arc, transmitter: CommandSender, task: JoinHandle>) -> Self { +// Self { +// data, +// task, +// transmitter, +// } +// } +// } + +// #[derive(Clone, Debug)] +// pub enum EventHandle { +// Stats(StatsHandle), +// Filters(FiltersHandle), +// } +// +// impl EventHandle { +// pub fn send(&self, cmd: Command) -> Result<()> { +// if let EventHandle::Filters(handle) = self { +// send_command!(handle.transmitter, cmd); +// } +// Ok(()) +// } +// +// +// pub fn stats(&self) -> Result<&StatsHandle> { +// match self { +// EventHandle::Stats(handle) => Ok(handle), +// _ => { +// bail!("no underlying StatsHandle found") +// } +// } +// } +// +// pub fn filters(&self) -> Result<&FiltersHandle> { +// match self { +// EventHandle::Filters(handle) => Ok(handle), +// _ => { +// bail!("no underlying FiltersHandle found") +// } +// } +// } +// } + +// todo need to move these into their proper places + +/// todo docs everywhere in this file + +// /// todo docs if this pans out +#[derive(Clone, Debug)] +pub struct Handles { + pub stats: StatsHandle, + pub filters: FiltersHandle, +} + +impl Handles { + pub fn new(stats: StatsHandle, filters: FiltersHandle) -> Self { + Self { stats, filters } + } + + // /// todo + // /// + // /// expected order of task completion + // /// - terminal + // /// - file (if present) + // /// - filters + // /// - stats + // pub async fn clean_up(self) -> Result<()> { + // // todo trace + // + // send_command!(self.filters.tx, Command::Exit); // send exit command and await the end of the future + // self.filters + // .task + // .await + // .with_context(|| fmt_err("Could not await a filters handler's receiver"))?; + // + // send_command!(self.stats.tx, Command::Exit); + // self.stats + // .task + // .await + // .with_context(|| fmt_err("Could not await a stats handler's receiver"))?; + // + // Ok(()) + // } +} diff --git a/src/event_handlers/filters.rs b/src/event_handlers/filters.rs new file mode 100644 index 0000000..d0bb7e5 --- /dev/null +++ b/src/event_handlers/filters.rs @@ -0,0 +1,66 @@ +use super::*; +use crate::{filters::FeroxFilters, FeroxChannel}; +use anyhow::Result; +use std::sync::Arc; +use tokio::{ + sync::mpsc::{self, UnboundedReceiver}, + task::JoinHandle, +}; + +/// event handler for updating a single data structure of all active filters +#[derive(Debug)] +pub struct FiltersHandler { + /// collection of generic type `T` where `T` is some collection of data + data: Arc, + + /// Receiver half of mpsc from which `Command`s are processed + receiver: UnboundedReceiver, +} + +/// implementation of event handler for filters + +impl FiltersHandler { + /// create new event handler + pub fn new(data: Arc, receiver: UnboundedReceiver) -> Self { + Self { data, receiver } + } + + /// Initialize new `FeroxFilters` and the sc side of an mpsc channel that is responsible for + /// updates to the aforementioned object. + pub fn initialize() -> (JoinHandle>, FiltersHandle) { + log::trace!("enter: initialize"); + + let data = Arc::new(FeroxFilters::default()); + let (tx, rx): FeroxChannel = mpsc::unbounded_channel(); + + let mut handler = Self::new(data.clone(), rx); + + let task = tokio::spawn(async move { handler.start().await }); + + let event_handle = FiltersHandle::new(data, tx); + + log::trace!("exit: initialize -> ({:?})", event_handle); + + (task, event_handle) + } + + /// Start a single consumer task (sc side of mpsc) + /// + /// The consumer simply receives `Command` and acts accordingly + pub async fn start(&mut self) -> Result<()> { + log::trace!("enter: start({:?})", self); + + while let Some(command) = self.receiver.recv().await { + match command { + Command::AddFilter(filter) => { + self.data.push(filter)?; + } + Command::Exit => break, + _ => {} // no other commands needed for FilterHandler + } + } + + log::trace!("exit: start"); + Ok(()) + } +} diff --git a/src/event_handlers/mod.rs b/src/event_handlers/mod.rs index a6eb005..bb8b6f9 100644 --- a/src/event_handlers/mod.rs +++ b/src/event_handlers/mod.rs @@ -1,3 +1,11 @@ //! collection of event handlers (typically long-running tokio spawned tasks) mod statistics; -pub use statistics::StatsHandler; +mod filters; +mod container; +mod builder; +mod command; + +pub use self::command::Command; +pub use self::container::{FiltersHandle, Handles, StatsHandle, Tasks}; +pub use self::filters::FiltersHandler; +pub use self::statistics::StatsHandler; diff --git a/src/event_handlers/statistics.rs b/src/event_handlers/statistics.rs index f0f7f77..81cf6ab 100644 --- a/src/event_handlers/statistics.rs +++ b/src/event_handlers/statistics.rs @@ -1,13 +1,16 @@ +use super::*; use crate::{ config::CONFIGURATION, progress::{add_bar, BarType}, - statistics::{StatCommand, StatField, Stats}, + statistics::{StatField, Stats}, + FeroxChannel, }; use anyhow::Result; use console::style; use indicatif::ProgressBar; use std::{sync::Arc, time::Instant}; -use tokio::sync::mpsc::UnboundedReceiver; +use tokio::sync::mpsc::{self, UnboundedReceiver}; +use tokio::task::JoinHandle; /// event handler struct for updating statistics #[derive(Debug)] @@ -16,7 +19,7 @@ pub struct StatsHandler { bar: ProgressBar, /// Receiver half of mpsc from which `StatCommand`s are processed - receiver: UnboundedReceiver, + receiver: UnboundedReceiver, /// data class that stores all statistics updates stats: Arc, @@ -24,8 +27,8 @@ pub struct StatsHandler { /// implementation of event handler for statistics impl StatsHandler { - /// create new event handler builder - pub fn new(stats: Arc, rx_stats: UnboundedReceiver) -> Self { + /// create new event handler + fn new(stats: Arc, rx_stats: UnboundedReceiver) -> Self { // will be updated later via StatCommand; delay is for banner to print first let bar = ProgressBar::hidden(); @@ -39,30 +42,30 @@ impl StatsHandler { /// Start a single consumer task (sc side of mpsc) /// /// The consumer simply receives `StatCommands` and updates the given `Stats` object as appropriate - pub async fn start(&mut self) -> Result<()> { + async fn start(&mut self) -> Result<()> { log::trace!("enter: start({:?})", self); let start = Instant::now(); while let Some(command) = self.receiver.recv().await { - match command as StatCommand { - StatCommand::AddError(err) => { + match command as Command { + Command::AddError(err) => { self.stats.add_error(err); self.increment_bar(); } - StatCommand::AddStatus(status) => { + Command::AddStatus(status) => { self.stats.add_status_code(status); self.increment_bar(); } - StatCommand::AddRequest => { + Command::AddRequest => { self.stats.add_request(); self.increment_bar(); } - StatCommand::Save => { + Command::Save => { self.stats .save(start.elapsed().as_secs_f64(), &CONFIGURATION.output)?; } - StatCommand::UpdateUsizeField(field, value) => { + Command::UpdateUsizeField(field, value) => { let update_len = matches!(field, StatField::TotalScans); self.stats.update_usize_field(field, value); @@ -70,16 +73,15 @@ impl StatsHandler { self.bar.set_length(self.stats.total_expected() as u64) } } - StatCommand::UpdateF64Field(field, value) => { - self.stats.update_f64_field(field, value) - } - StatCommand::CreateBar => { + Command::UpdateF64Field(field, value) => self.stats.update_f64_field(field, value), + Command::CreateBar => { self.bar = add_bar("", self.stats.total_expected() as u64, BarType::Total); } - StatCommand::LoadStats(filename) => { + Command::LoadStats(filename) => { self.stats.merge_from(&filename)?; } - StatCommand::Exit => break, + Command::Exit => break, + _ => {} // no more commands needed } } @@ -103,4 +105,23 @@ impl StatsHandler { self.bar.set_message(&msg); self.bar.inc(1); } + + /// Initialize new `Stats` object and the sc side of an mpsc channel that is responsible for + /// updates to the aforementioned object. + pub fn initialize() -> (JoinHandle>, StatsHandle) { + log::trace!("enter: initialize"); + + let data = Arc::new(Stats::new()); + let (tx, rx): FeroxChannel = mpsc::unbounded_channel(); + + let mut handler = StatsHandler::new(data.clone(), rx); + + let task = tokio::spawn(async move { handler.start().await }); + + let event_handle = StatsHandle::new(data, tx); + + log::trace!("exit: initialize -> ({:?})", event_handle); + + (task, event_handle) + } } diff --git a/src/extractor/builder.rs b/src/extractor/builder.rs index 4670f21..b877d60 100644 --- a/src/extractor/builder.rs +++ b/src/extractor/builder.rs @@ -34,7 +34,7 @@ pub struct ExtractorBuilder<'a> { config: Option<&'a Configuration>, /// transmitter to the mpsc that handles statistics gathering - tx_stats: Option>, + tx_stats: Option>, /// transmitter to the mpsc that handles recursive scan calls tx_recursion: Option>, @@ -106,7 +106,7 @@ impl<'a> ExtractorBuilder<'a> { } /// builder call to set `tx_stats` - pub fn stats_transmitter(&mut self, tx_stats: UnboundedSender) -> &mut Self { + pub fn stats_transmitter(&mut self, tx_stats: UnboundedSender) -> &mut Self { self.tx_stats = Some(tx_stats); self } diff --git a/src/extractor/container.rs b/src/extractor/container.rs index ef3e322..b41ccb1 100644 --- a/src/extractor/container.rs +++ b/src/extractor/container.rs @@ -1,12 +1,10 @@ use super::*; use crate::{ client, + event_handlers::Command::UpdateUsizeField, scanner::{send_report, should_filter_response, try_recursion}, - statistics::{ - StatCommand::UpdateUsizeField, - StatField::{LinksExtracted, TotalExpected}, - }, - update_stat, + send_command, + statistics::StatField::{LinksExtracted, TotalExpected}, utils::{format_url, make_request}, }; use anyhow::{bail, Context, Result}; @@ -42,7 +40,7 @@ pub struct Extractor<'a> { pub(super) config: &'a Configuration, /// transmitter to the mpsc that handles statistics gathering - pub(super) tx_stats: UnboundedSender, + pub(super) tx_stats: UnboundedSender, /// transmitter to the mpsc that handles recursive scan calls pub(super) tx_recursion: UnboundedSender, @@ -399,8 +397,8 @@ impl<'a> Extractor<'a> { fn update_stats(&self, num_links: usize) { let multiplier = self.config.extensions.len().max(1); - update_stat!(self.tx_stats, UpdateUsizeField(LinksExtracted, num_links)); - update_stat!( + send_command!(self.tx_stats, UpdateUsizeField(LinksExtracted, num_links)); + send_command!( self.tx_stats, UpdateUsizeField(TotalExpected, num_links * multiplier) ); diff --git a/src/extractor/mod.rs b/src/extractor/mod.rs index 59ffa2a..c5c267b 100644 --- a/src/extractor/mod.rs +++ b/src/extractor/mod.rs @@ -9,9 +9,7 @@ pub use self::builder::ExtractorBuilder; pub use self::container::Extractor; use crate::{ - config::Configuration, - scan_manager::FeroxScans, - statistics::{StatCommand, Stats}, + config::Configuration, event_handlers::Command, scan_manager::FeroxScans, statistics::Stats, FeroxResponse, }; use regex::Regex; diff --git a/src/extractor/tests.rs b/src/extractor/tests.rs index 5636dce..20d8c20 100644 --- a/src/extractor/tests.rs +++ b/src/extractor/tests.rs @@ -44,7 +44,7 @@ fn get_test_response() -> FeroxResponse { /// creates a single extractor that can be used to test standalone functions fn setup_extractor(target: ExtractionTarget) -> Extractor<'static> { let (tx_dir, _): FeroxChannel = mpsc::unbounded_channel(); - let (tx_stats, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx_stats, _): FeroxChannel = mpsc::unbounded_channel(); let (tx_term, _): FeroxChannel = mpsc::unbounded_channel(); let stats = Arc::new(Stats::new()); @@ -143,7 +143,7 @@ fn extractor_get_sub_paths_from_path_with_an_absolute_word() { /// test that an ExtractorBuilder without a FeroxResponse and without a URL bails fn extractor_builder_bails_when_neither_required_field_is_set() { let (tx_dir, _): FeroxChannel = mpsc::unbounded_channel(); - let (tx_stats, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx_stats, _): FeroxChannel = mpsc::unbounded_channel(); let (tx_term, _): FeroxChannel = mpsc::unbounded_channel(); let stats = Arc::new(Stats::new()); @@ -167,7 +167,7 @@ fn extractor_with_non_base_url_bails() -> Result<()> { let link = "admin"; let (tx_dir, _): FeroxChannel = mpsc::unbounded_channel(); - let (tx_stats, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx_stats, _): FeroxChannel = mpsc::unbounded_channel(); let (tx_term, _): FeroxChannel = mpsc::unbounded_channel(); let stats = Arc::new(Stats::new()); @@ -236,7 +236,7 @@ fn extractor_add_link_to_set_of_links_with_non_base_url() { /// domain; expect an empty set returned async fn extractor_get_links_with_absolute_url_that_differs_from_target_domain() -> Result<()> { let (tx_dir, _): FeroxChannel = mpsc::unbounded_channel(); - let (tx_stats, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx_stats, _): FeroxChannel = mpsc::unbounded_channel(); let (tx_term, _): FeroxChannel = mpsc::unbounded_channel(); let stats = Arc::new(Stats::new()); @@ -283,7 +283,7 @@ async fn extractor_get_links_with_absolute_url_that_differs_from_target_domain() /// test that /robots.txt is correctly requested given a base url (happy path) async fn request_robots_txt_without_proxy() -> Result<()> { let (tx_dir, _): FeroxChannel = mpsc::unbounded_channel(); - let (tx_stats, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx_stats, _): FeroxChannel = mpsc::unbounded_channel(); let (tx_term, _): FeroxChannel = mpsc::unbounded_channel(); let stats = Arc::new(Stats::new()); let config = Configuration::new(); @@ -323,7 +323,7 @@ async fn request_robots_txt_without_proxy() -> Result<()> { /// test that /robots.txt is correctly requested given a base url (happy path) when a proxy is used async fn request_robots_txt_with_proxy() -> Result<()> { let (tx_dir, _): FeroxChannel = mpsc::unbounded_channel(); - let (tx_stats, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx_stats, _): FeroxChannel = mpsc::unbounded_channel(); let (tx_term, _): FeroxChannel = mpsc::unbounded_channel(); let stats = Arc::new(Stats::new()); let mut config = Configuration::new(); diff --git a/src/filters/container.rs b/src/filters/container.rs new file mode 100644 index 0000000..ab38f1b --- /dev/null +++ b/src/filters/container.rs @@ -0,0 +1,21 @@ +use super::FeroxFilter; +use anyhow::Result; +use std::sync::Mutex; + +/// Container around a collection of `FeroxFilters`s +#[derive(Debug, Default)] +pub struct FeroxFilters { + /// collection of `FeroxFilters` + pub filters: Mutex>>, +} + +/// implementation of FeroxFilter collection +impl FeroxFilters { + /// add a single FeroxFilter to the collection + pub fn push(&self, filter: Box) -> Result<()> { + if let Ok(mut unlocked) = self.filters.lock() { + unlocked.push(filter) + } + Ok(()) + } +} diff --git a/src/filters/mod.rs b/src/filters/mod.rs index 78a675c..e1faf1a 100644 --- a/src/filters/mod.rs +++ b/src/filters/mod.rs @@ -6,9 +6,11 @@ mod lines; mod size; mod regex; mod similarity; +mod container; #[cfg(test)] mod tests; +pub use self::container::FeroxFilters; pub use self::lines::LinesFilter; pub use self::regex::RegexFilter; pub use self::similarity::SimilarityFilter; diff --git a/src/heuristics.rs b/src/heuristics.rs index 5ef3991..c0261dc 100644 --- a/src/heuristics.rs +++ b/src/heuristics.rs @@ -1,8 +1,8 @@ use crate::{ config::{CONFIGURATION, PROGRESS_PRINTER}, + event_handlers::Command, filters::WildcardFilter, scanner::should_filter_response, - statistics::StatCommand, utils::{ferox_print, format_url, get_url_path_length, make_request, status_colorizer}, FeroxResponse, }; @@ -41,7 +41,7 @@ pub async fn wildcard_test( target_url: &str, bar: ProgressBar, tx_term: UnboundedSender, - tx_stats: UnboundedSender, + tx_stats: UnboundedSender, ) -> Option { log::trace!( "enter: wildcard_test({:?}, {:?}, {:?}, {:?})", @@ -147,7 +147,7 @@ async fn make_wildcard_request( target_url: &str, length: usize, tx_file: UnboundedSender, - tx_stats: UnboundedSender, + tx_stats: UnboundedSender, ) -> Option { log::trace!( "enter: make_wildcard_request({}, {}, {:?}, {:?})", @@ -220,7 +220,7 @@ async fn make_wildcard_request( /// Any urls that are found to be alive are returned to the caller. pub async fn connectivity_test( target_urls: &[String], - tx_stats: UnboundedSender, + tx_stats: UnboundedSender, ) -> Vec { log::trace!( "enter: connectivity_test({:?}, {:?})", diff --git a/src/lib.rs b/src/lib.rs index 1921c8c..021a7f5 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,7 +1,7 @@ pub mod banner; pub mod config; mod client; -mod event_handlers; +pub mod event_handlers; mod filters; pub mod heuristics; pub mod logger; @@ -16,6 +16,7 @@ pub mod utils; mod extractor; use crate::{ + event_handlers::Command, traits::FeroxSerialize, utils::{fmt_err, get_url_path_length, status_colorizer}, }; @@ -34,6 +35,9 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; /// Generic Result type to ease error handling in async contexts pub type FeroxResult = std::result::Result>; +/// Alias for UnboundedSender +pub type CommandSender = UnboundedSender; + /// Simple Error implementation to allow for custom error returns #[derive(Debug, Default)] pub struct FeroxError { diff --git a/src/main.rs b/src/main.rs index f2b12fd..815abdf 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,20 +1,19 @@ -use anyhow::{bail, Context, Result}; +use anyhow::{bail, Result}; use crossterm::event::{self, Event, KeyCode}; use feroxbuster::{ banner::{Banner, UPDATE_URL}, config::{CONFIGURATION, PROGRESS_BAR, PROGRESS_PRINTER}, + event_handlers::{ + Command::{self, CreateBar, Exit, LoadStats, UpdateUsizeField}, + FiltersHandler, Handles, StatsHandler, Tasks, + }, heuristics, logger, progress::{add_bar, BarType}, reporter, scan_manager::{self, ScanStatus, PAUSE_SCAN}, scanner::{self, scan_url, SCANNED_URLS}, - statistics::{ - self, - StatCommand::{self, CreateBar, LoadStats, UpdateUsizeField}, - StatField::InitialTargets, - Stats, - }, - update_stat, + send_command, + statistics::{StatField::InitialTargets, Stats}, utils::{ferox_print, fmt_err, get_current_depth, status_colorizer}, FeroxError, FeroxResponse, FeroxResult, SLEEP_DURATION, }; @@ -34,7 +33,7 @@ use std::{ thread::sleep, time::Duration, }; -use tokio::{io, sync::mpsc::UnboundedSender, task::JoinHandle}; +use tokio::{io, sync::mpsc::UnboundedSender}; use tokio_util::codec::{FramedRead, LinesCodec}; /// Atomic boolean flag, used to determine whether or not the terminal input handler should exit @@ -73,18 +72,10 @@ fn terminal_input_handler() { } /// Create a HashSet of Strings from the given wordlist then stores it inside an Arc -fn get_unique_words_from_wordlist(path: &str) -> FeroxResult>> { +fn get_unique_words_from_wordlist(path: &str) -> Result>> { log::trace!("enter: get_unique_words_from_wordlist({})", path); - let file = match File::open(&path) { - Ok(f) => f, - Err(e) => { - log::error!("Could not open wordlist: {}", e); - log::trace!("exit: get_unique_words_from_wordlist -> {}", e); - - return Err(Box::new(e)); - } - }; + let file = File::open(&path)?; let reader = BufReader::new(file); @@ -111,18 +102,16 @@ fn get_unique_words_from_wordlist(path: &str) -> FeroxResult /// Determine whether it's a single url scan or urls are coming from stdin, then scan as needed async fn scan( targets: Vec, - stats: Arc, tx_term: UnboundedSender, tx_file: UnboundedSender, - tx_stats: UnboundedSender, -) -> FeroxResult<()> { + handles: Handles, +) -> Result<()> { log::trace!( - "enter: scan({:?}, {:?}, {:?}, {:?}, {:?})", + "enter: scan({:?}, {:?}, {:?}, {:?})", targets, - stats, tx_term, tx_file, - tx_stats + handles ); // cloning an Arc is cheap (it's basically a pointer into the heap) // so that will allow for cheap/safe sharing of a single wordlist across multi-target scans @@ -132,14 +121,10 @@ async fn scan( .await??; if words.len() == 0 { - let err = FeroxError { - message: format!("Did not find any words in {}", CONFIGURATION.wordlist), - }; - - return Err(Box::new(err)); + bail!("Did not find any words in {}", CONFIGURATION.wordlist); } - scanner::initialize(words.len(), &CONFIGURATION, tx_stats.clone()).await; + scanner::initialize(words.len(), &CONFIGURATION, handles.stats.tx.clone()).await; // at this point, the stat thread's progress bar can be created; things that needed to happen // first: @@ -147,10 +132,12 @@ async fn scan( // - scanner initialized (this sent expected requests per directory to the stats thread, which // having been set, makes it so the progress bar doesn't flash as full before anything has // even happened - update_stat!(tx_stats, CreateBar); + handles.stats.send(CreateBar)?; if CONFIGURATION.resumed { - update_stat!(tx_stats, LoadStats(CONFIGURATION.resume_from.clone())); + handles + .stats + .send(LoadStats(CONFIGURATION.resume_from.clone()))?; SCANNED_URLS.print_known_responses(); @@ -177,8 +164,8 @@ async fn scan( let word_clone = words.clone(); let term_clone = tx_term.clone(); let file_clone = tx_file.clone(); - let tx_stats_clone = tx_stats.clone(); - let stats_clone = stats.clone(); + let tx_stats_clone = handles.stats.tx.clone(); + let stats_clone = handles.stats.data.clone(); let task = tokio::spawn(async move { let base_depth = get_current_depth(&target); @@ -260,13 +247,15 @@ async fn wrapped_main() -> Result<()> { PROGRESS_BAR.join().unwrap(); }); - let (stats, tx_stats, stats_handle) = statistics::initialize(); + let (stats_task, stats_handle) = StatsHandler::initialize(); + let (filters_task, filters_handle) = FiltersHandler::initialize(); + let handles = Handles::new(stats_handle, filters_handle); if !CONFIGURATION.time_limit.is_empty() { // --time-limit value not an empty string, need to kick off the thread that enforces // the limit - let max_time_stats = stats.clone(); + let max_time_stats = handles.stats.data.clone(); tokio::spawn(async move { scan_manager::start_max_time_thread(&CONFIGURATION.time_limit, max_time_stats).await @@ -285,32 +274,29 @@ async fn wrapped_main() -> Result<()> { if CONFIGURATION.save_state { // start the ctrl+c handler - scan_manager::initialize(stats.clone()); + scan_manager::initialize(handles.stats.data.clone()); } + // todo transmitters here added to handles and made event handlers let (tx_term, tx_file, term_handle, file_handle) = - reporter::initialize(&CONFIGURATION.output, save_output, tx_stats.clone()); + reporter::initialize(&CONFIGURATION.output, save_output, handles.stats.tx.clone()); + + let tasks = Tasks::new(term_handle, file_handle, stats_task, filters_task); // get targets from command line or stdin let targets = match get_targets().await { Ok(t) => t, Err(e) => { // should only happen in the event that there was an error reading from stdin - clean_up( - tx_term, - term_handle, - tx_file, - file_handle, - tx_stats, - stats_handle, - save_output, - ) - .await?; + clean_up(tx_term, tx_file, handles, tasks).await?; bail!("Could not get determine initial targets: {}", e); } }; - update_stat!(tx_stats, UpdateUsizeField(InitialTargets, targets.len())); + send_command!( + handles.stats.tx, + UpdateUsizeField(InitialTargets, targets.len()) + ); if !CONFIGURATION.quiet { // only print banner if -q isn't used @@ -320,74 +306,32 @@ async fn wrapped_main() -> Result<()> { // only interested in the side-effect that sets banner.update_status let _ = banner - .check_for_updates(&CONFIGURATION.client, UPDATE_URL, tx_stats.clone()) + .check_for_updates(&CONFIGURATION.client, UPDATE_URL, handles.stats.tx.clone()) .await; banner.print_to(std_stderr, &CONFIGURATION)?; } // discard non-responsive targets - let live_targets = heuristics::connectivity_test(&targets, tx_stats.clone()).await; + let live_targets = heuristics::connectivity_test(&targets, handles.stats.tx.clone()).await; if live_targets.is_empty() { - clean_up( - tx_term, - term_handle, - tx_file, - file_handle, - tx_stats, - stats_handle, - save_output, - ) - .await?; + clean_up(tx_term, tx_file, handles, tasks).await?; bail!(fmt_err("Could not find any live targets to scan")); } // kick off a scan against any targets determined to be responsive - match scan( + scan( live_targets, - stats, tx_term.clone(), tx_file.clone(), - tx_stats.clone(), - ) - .await - { - Ok(_) => { - log::info!("All scans complete!"); - } - Err(e) => { - // todo status colorizer here and print is likely not needed - ferox_print( - &format!("{} while scanning: {}", status_colorizer("Error"), e), - &PROGRESS_PRINTER, - ); - clean_up( - tx_term, - term_handle, - tx_file, - file_handle, - tx_stats, - stats_handle, - save_output, - ) - .await?; - // todo bail? - process::exit(1); - } - }; - - clean_up( - tx_term, - term_handle, - tx_file, - file_handle, - tx_stats, - stats_handle, - save_output, + handles.clone(), ) .await?; + log::info!("All scans complete!"); + clean_up(tx_term, tx_file, handles, tasks).await?; + log::trace!("exit: wrapped_main"); Ok(()) } @@ -395,54 +339,33 @@ async fn wrapped_main() -> Result<()> { /// Single cleanup function that handles all the necessary drops/finishes etc required to gracefully /// shutdown the program async fn clean_up( - tx_term: UnboundedSender, - term_handle: JoinHandle<()>, + tx_term: UnboundedSender, // todo replace all of these with Handles obj tx_file: UnboundedSender, - file_handle: Option>, - tx_stats: UnboundedSender, - stats_handle: JoinHandle>, - save_output: bool, + handles: Handles, + tasks: Tasks, ) -> Result<()> { log::trace!( - "enter: clean_up({:?}, {:?}, {:?}, {:?}, {:?}, {:?}, {})", + "enter: clean_up({:?}, {:?}, {:?})", // todo missing tasks tx_term, - term_handle, tx_file, - file_handle, - tx_stats, - stats_handle, - save_output + handles, ); drop(tx_term); - log::trace!("dropped terminal output handler's transmitter"); + tasks.terminal.await??; - log::trace!("awaiting terminal output handler's receiver"); - // after dropping tx, we can await the future where rx lived - term_handle - .await - .with_context(|| fmt_err("Could not await terminal output handler's receiver"))?; - log::trace!("done awaiting terminal output handler's receiver"); - - log::trace!("tx_file: {:?}", tx_file); // the same drop/await process used on the terminal handler is repeated for the file handler // we drop the file transmitter every time, because it's created no matter what drop(tx_file); - log::trace!("dropped file output handler's transmitter"); - if save_output { - // but we only await if -o was specified - log::trace!("awaiting file output handler's receiver"); - file_handle - .unwrap() - .await - .with_context(|| fmt_err("Could not await file output handler's receiver"))?; - log::trace!("done awaiting file output handler's receiver"); + if let Some(file_task) = tasks.file { + file_task.await??; } - update_stat!(tx_stats, StatCommand::Exit); // send exit command and await the end of the future - stats_handle - .await? - .with_context(|| fmt_err("Could not await statistics handler's receiver"))?; + send_command!(handles.filters.tx, Exit); + tasks.filters.await??; + + send_command!(handles.stats.tx, Exit); + tasks.stats.await??; // mark all scans complete so the terminal input handler will exit cleanly SCAN_COMPLETE.store(true, Ordering::Relaxed); @@ -451,7 +374,7 @@ async fn clean_up( // the final trace messages above PROGRESS_PRINTER.finish(); - drop(tx_stats); + // drop(tx_stats); log::trace!("exit: clean_up"); Ok(()) diff --git a/src/reporter.rs b/src/reporter.rs index ebf1dc7..7fdca85 100644 --- a/src/reporter.rs +++ b/src/reporter.rs @@ -1,15 +1,13 @@ use crate::{ config::{CONFIGURATION, PROGRESS_PRINTER}, + event_handlers::Command::{self, UpdateUsizeField}, scanner::RESPONSES, - statistics::{ - StatCommand::{self, UpdateUsizeField}, - StatField::ResourcesDiscovered, - }, - update_stat, + send_command, + statistics::StatField::ResourcesDiscovered, utils::{ferox_print, make_request, open_file}, FeroxChannel, FeroxResponse, FeroxSerialize, }; - +use anyhow::Result; use console::strip_ansi_codes; use std::{ fs, io, @@ -52,12 +50,12 @@ pub fn get_cached_file_handle(filename: &str) -> Option, + tx_stats: UnboundedSender, ) -> ( UnboundedSender, UnboundedSender, - JoinHandle<()>, - Option>, + JoinHandle>, + Option>>, ) { log::trace!( "enter: initialize({}, {}, {:?})", @@ -103,9 +101,9 @@ pub fn initialize( async fn spawn_terminal_reporter( mut resp_chan: UnboundedReceiver, file_chan: UnboundedSender, - tx_stats: UnboundedSender, + tx_stats: UnboundedSender, save_output: bool, -) { +) -> Result<()> { log::trace!( "enter: spawn_terminal_reporter({:?}, {:?}, {:?}, {})", resp_chan, @@ -125,7 +123,7 @@ async fn spawn_terminal_reporter( // print to stdout ferox_print(&resp.as_str(), &PROGRESS_PRINTER); - update_stat!(tx_stats, UpdateUsizeField(ResourcesDiscovered, 1)); + send_command!(tx_stats, UpdateUsizeField(ResourcesDiscovered, 1)); if save_output { // -o used, need to send the report to be written out to disk @@ -171,6 +169,7 @@ async fn spawn_terminal_reporter( } } log::trace!("exit: spawn_terminal_reporter"); + Ok(()) } /// Spawn a single consumer task (sc side of mpsc) @@ -179,14 +178,14 @@ async fn spawn_terminal_reporter( /// the given reporting criteria async fn spawn_file_reporter( mut report_channel: UnboundedReceiver, - tx_stats: UnboundedSender, + tx_stats: UnboundedSender, output_file: &str, -) { +) -> Result<()> { let buffered_file = match get_cached_file_handle(&CONFIGURATION.output) { Some(file) => file, None => { log::trace!("exit: spawn_file_reporter"); - return; + return Ok(()); } }; @@ -202,9 +201,10 @@ async fn spawn_file_reporter( safe_file_write(&response, buffered_file.clone(), CONFIGURATION.json); } - update_stat!(tx_stats, StatCommand::Save); + send_command!(tx_stats, Command::Save); log::trace!("exit: spawn_file_reporter"); + Ok(()) } /// Given a string and a reference to a locked buffered file, write the contents and flush diff --git a/src/scanner.rs b/src/scanner.rs index efd8911..ae18a7d 100644 --- a/src/scanner.rs +++ b/src/scanner.rs @@ -1,5 +1,6 @@ use crate::{ config::{Configuration, CONFIGURATION}, + event_handlers::Command::{self, UpdateF64Field, UpdateUsizeField}, extractor::ExtractorBuilder, filters::{ LinesFilter, RegexFilter, SimilarityFilter, SizeFilter, StatusCodeFilter, WildcardFilter, @@ -7,13 +8,12 @@ use crate::{ }, heuristics, scan_manager::{FeroxResponses, FeroxScans, ScanStatus, PAUSE_SCAN}, + send_command, statistics::{ - StatCommand::{self, UpdateF64Field, UpdateUsizeField}, StatField::{DirScanTimes, ExpectedPerScan, TotalScans, WildcardsFiltered}, Stats, }, traits::FeroxFilter, - update_stat, utils::{format_url, get_current_depth, make_request}, FeroxChannel, FeroxResponse, SIMILARITY_THRESHOLD, }; @@ -112,7 +112,7 @@ fn spawn_recursion_handler( stats: Arc, tx_term: UnboundedSender, tx_file: UnboundedSender, - tx_stats: UnboundedSender, + tx_stats: UnboundedSender, ) -> BoxFuture<'static, Vec>>> { log::trace!( "enter: spawn_recursion_handler({:?}, wordlist[{} words...], {}, {:?}, {:?}, {:?}, {:?})", @@ -136,7 +136,7 @@ fn spawn_recursion_handler( continue; } - update_stat!(tx_stats, UpdateUsizeField(TotalScans, 1)); + send_command!(tx_stats, UpdateUsizeField(TotalScans, 1)); log::info!("received {} on recursion channel", resp); @@ -186,7 +186,7 @@ fn create_urls( target_url: &str, word: &str, extensions: &[String], - tx_stats: UnboundedSender, + tx_stats: UnboundedSender, ) -> Vec { log::trace!( "enter: create_urls({}, {}, {:?}, {:?})", @@ -364,7 +364,7 @@ pub async fn try_recursion( /// to the user or not. pub fn should_filter_response( response: &FeroxResponse, - tx_stats: UnboundedSender, + tx_stats: UnboundedSender, ) -> bool { match FILTERS.read() { Ok(filters) => { @@ -372,7 +372,7 @@ pub fn should_filter_response( // wildcard.should_filter goes here if filter.should_filter_response(&response) { if filter.as_any().downcast_ref::().is_some() { - update_stat!(tx_stats, UpdateUsizeField(WildcardsFiltered, 1)) + send_command!(tx_stats, UpdateUsizeField(WildcardsFiltered, 1)) } return true; } @@ -397,7 +397,7 @@ async fn make_requests( stats: Arc, dir_chan: UnboundedSender, report_chan: UnboundedSender, - tx_stats: UnboundedSender, + tx_stats: UnboundedSender, ) { log::trace!( "enter: make_requests({}, {}, {}, {:?}, {:?}, {:?}, {:?})", @@ -480,7 +480,7 @@ pub async fn scan_url( stats: Arc, tx_term: UnboundedSender, tx_file: UnboundedSender, - tx_stats: UnboundedSender, + tx_stats: UnboundedSender, ) { log::trace!( "enter: scan_url({:?}, wordlist[{} words...], {}, {:?}, {:?}, {:?}, {:?})", @@ -520,7 +520,7 @@ pub async fn scan_url( let _ = extractor.extract().await; } - update_stat!(tx_stats, UpdateUsizeField(TotalScans, 1)); + send_command!(tx_stats, UpdateUsizeField(TotalScans, 1)); // this protection allows us to add the first scanned url to SCANNED_URLS // from within the scan_url function instead of the recursion handler @@ -634,7 +634,7 @@ pub async fn scan_url( producers.await; log::trace!("done awaiting scan producers"); - update_stat!( + send_command!( tx_stats, UpdateF64Field(DirScanTimes, scan_timer.elapsed().as_secs_f64()) ); @@ -663,7 +663,7 @@ pub async fn scan_url( pub async fn initialize( num_words: usize, config: &Configuration, - tx_stats: UnboundedSender, + tx_stats: UnboundedSender, ) { log::trace!( "enter: initialize({}, {:?}, {:?})", @@ -681,7 +681,7 @@ pub async fn initialize( }; // tell Stats object about the number of expected requests - update_stat!( + send_command!( tx_stats, UpdateUsizeField(ExpectedPerScan, num_reqs_expected as usize) ); @@ -791,7 +791,7 @@ mod tests { #[test] /// sending url + word without any extensions should get back one url with the joined word fn create_urls_no_extension_returns_base_url_with_word() { - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); let urls = create_urls("http://localhost", "turbo", &[], tx); assert_eq!(urls, [Url::parse("http://localhost/turbo").unwrap()]) } @@ -799,7 +799,7 @@ mod tests { #[test] /// sending url + word + 1 extension should get back two urls, one base and one with extension fn create_urls_one_extension_returns_two_urls() { - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); let urls = create_urls("http://localhost", "turbo", &[String::from("js")], tx); assert_eq!( urls, @@ -838,7 +838,7 @@ mod tests { vec![base, js, php, pdf, tar], ]; - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); for (i, ext_set) in ext_vec.into_iter().enumerate() { let urls = create_urls("http://localhost", "turbo", &ext_set, tx.clone()); @@ -894,7 +894,7 @@ mod tests { filter_regex: vec![r"(".to_string()], ..Default::default() }; - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); initialize(1, &config, tx).await; } } diff --git a/src/statistics/container.rs b/src/statistics/container.rs index 587e25a..3d45bec 100644 --- a/src/statistics/container.rs +++ b/src/statistics/container.rs @@ -414,9 +414,9 @@ mod tests { async fn statistics_handler_increments_requests() { let (stats, tx, handle) = setup_stats_test(); - tx.send(StatCommand::AddRequest).unwrap_or_default(); - tx.send(StatCommand::AddRequest).unwrap_or_default(); - tx.send(StatCommand::AddRequest).unwrap_or_default(); + tx.send(Command::AddRequest).unwrap_or_default(); + tx.send(Command::AddRequest).unwrap_or_default(); + tx.send(Command::AddRequest).unwrap_or_default(); teardown_stats_test(tx, handle).await; @@ -433,8 +433,8 @@ mod tests { async fn statistics_handler_increments_403() { let (stats, tx, handle) = setup_stats_test(); - let err = StatCommand::AddError(StatError::Status403); - let err2 = StatCommand::AddError(StatError::Status403); + let err = Command::AddError(StatError::Status403); + let err2 = Command::AddError(StatError::Status403); tx.send(err).unwrap_or_default(); tx.send(err2).unwrap_or_default(); @@ -456,8 +456,8 @@ mod tests { async fn statistics_handler_increments_403_via_status_code() { let (stats, tx, handle) = setup_stats_test(); - let err = StatCommand::AddStatus(reqwest::StatusCode::FORBIDDEN); - let err2 = StatCommand::AddStatus(reqwest::StatusCode::FORBIDDEN); + let err = Command::AddStatus(reqwest::StatusCode::FORBIDDEN); + let err2 = Command::AddStatus(reqwest::StatusCode::FORBIDDEN); tx.send(err).unwrap_or_default(); tx.send(err2).unwrap_or_default(); @@ -477,8 +477,8 @@ mod tests { async fn statistics_handler_increments_500_via_status_code() { let (stats, tx, handle) = setup_stats_test(); - let err = StatCommand::AddStatus(reqwest::StatusCode::INTERNAL_SERVER_ERROR); - let err2 = StatCommand::AddStatus(reqwest::StatusCode::INTERNAL_SERVER_ERROR); + let err = Command::AddStatus(reqwest::StatusCode::INTERNAL_SERVER_ERROR); + let err2 = Command::AddStatus(reqwest::StatusCode::INTERNAL_SERVER_ERROR); tx.send(err).unwrap_or_default(); tx.send(err2).unwrap_or_default(); diff --git a/src/statistics/init.rs b/src/statistics/init.rs index dbb60e0..8b13789 100644 --- a/src/statistics/init.rs +++ b/src/statistics/init.rs @@ -1,34 +1 @@ -use super::{command::StatCommand, container::Stats}; -use crate::{event_handlers::StatsHandler, FeroxChannel}; -use anyhow::Result; -use std::sync::Arc; -use tokio::{ - sync::mpsc::{self, UnboundedSender}, - task::JoinHandle, -}; -/// Initialize new `Stats` object and the sc side of an mpsc channel that is responsible for -/// updates to the aforementioned object. -pub fn initialize() -> ( - Arc, - UnboundedSender, - JoinHandle>, -) { - log::trace!("enter: initialize"); - - let stats_tracker = Arc::new(Stats::new()); - let (tx_stats, rx_stats): FeroxChannel = mpsc::unbounded_channel(); - - let mut handler = StatsHandler::new(stats_tracker.clone(), rx_stats); - - let stats_thread = tokio::spawn(async move { handler.start().await }); - - log::trace!( - "exit: initialize -> ({:?}, {:?}, {:?})", - stats_tracker, - tx_stats, - stats_thread - ); - - (stats_tracker, tx_stats, stats_thread) -} diff --git a/src/statistics/mod.rs b/src/statistics/mod.rs index a54823c..d699a1d 100644 --- a/src/statistics/mod.rs +++ b/src/statistics/mod.rs @@ -1,17 +1,13 @@ mod error; mod macros; mod container; -mod command; mod field; -mod init; #[cfg(test)] mod tests; -pub use self::command::StatCommand; pub use self::container::Stats; pub use self::error::StatError; pub use self::field::StatField; -pub use self::init::initialize; #[cfg(test)] use self::tests::{setup_stats_test, teardown_stats_test}; diff --git a/src/statistics/tests.rs b/src/statistics/tests.rs index 419ae65..faa450e 100644 --- a/src/statistics/tests.rs +++ b/src/statistics/tests.rs @@ -1,5 +1,5 @@ use super::*; -use crate::FeroxSerialize; +use crate::{event_handlers::StatsHandler, FeroxSerialize}; use anyhow::Result; use reqwest::StatusCode; use std::sync::Arc; @@ -7,22 +7,15 @@ use tempfile::NamedTempFile; use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle}; /// simple helper to reduce code reuse -pub fn setup_stats_test() -> ( - Arc, - UnboundedSender, - JoinHandle>, -) { - initialize() +pub fn setup_stats_test() -> (Arc, UnboundedSender, JoinHandle>) { + StatsHandler::initialize() } /// another helper to stay DRY; must be called after any sent commands and before any checks /// performed against the Stats object -pub async fn teardown_stats_test( - sender: UnboundedSender, - handle: JoinHandle>, -) { +pub async fn teardown_stats_test(sender: UnboundedSender, handle: JoinHandle>) { // send exit and await, once the await completes, stats should be updated - sender.send(StatCommand::Exit).unwrap_or_default(); + sender.send(Command::Exit).unwrap_or_default(); handle.await.unwrap().unwrap(); } @@ -31,7 +24,7 @@ pub async fn teardown_stats_test( async fn statistics_handler_exits() { let (_, sender, handle) = setup_stats_test(); - sender.send(StatCommand::Exit).unwrap_or_default(); + sender.send(Command::Exit).unwrap_or_default(); handle.await.unwrap().unwrap(); // blocks on the handler's while loop diff --git a/src/traits.rs b/src/traits.rs index cae8740..69e148c 100644 --- a/src/traits.rs +++ b/src/traits.rs @@ -41,3 +41,6 @@ pub trait FeroxSerialize: Serialize { /// Return an NDJSON representation of the object fn as_json(&self) -> Result; } + +/// todo doc +pub trait HandlerCommand {} diff --git a/src/utils.rs b/src/utils.rs index c021ec1..47b0f69 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,10 +1,8 @@ #![macro_use] use crate::{ config::{CONFIGURATION, PROGRESS_PRINTER}, - statistics::{ - StatCommand::{self, AddError, AddStatus}, - StatError::{Connection, Other, Redirection, Request, Timeout, UrlFormat}, - }, + event_handlers::Command::{self, AddError, AddStatus}, + statistics::StatError::{Connection, Other, Redirection, Request, Timeout, UrlFormat}, FeroxError, }; use anyhow::{bail, Context, Result}; @@ -168,7 +166,7 @@ pub fn ferox_print(msg: &str, bar: &ProgressBar) { #[macro_export] /// wrapper to improve code readability -macro_rules! update_stat { +macro_rules! send_command { ($tx:expr, $value:expr) => { $tx.send($value).unwrap_or_default(); }; @@ -183,7 +181,7 @@ pub fn format_url( add_slash: bool, queries: &[(String, String)], extension: Option<&str>, - tx_stats: UnboundedSender, + tx_stats: UnboundedSender, ) -> Result { log::trace!( "enter: format_url({}, {}, {}, {:?} {:?}, {:?})", @@ -211,7 +209,7 @@ pub fn format_url( let err = FeroxError { message }; - update_stat!(tx_stats, AddError(UrlFormat)); + send_command!(tx_stats, AddError(UrlFormat)); log::trace!("exit: format_url -> {}", err); bail!("{}", err); @@ -281,7 +279,7 @@ pub fn format_url( } } Err(e) => { - update_stat!(tx_stats, AddError(UrlFormat)); + send_command!(tx_stats, AddError(UrlFormat)); log::trace!("exit: format_url -> {}", e); log::error!("Could not join {} with {}", word, base_url); bail!("{}", e) @@ -293,7 +291,7 @@ pub fn format_url( pub async fn make_request( client: &Client, url: &Url, - tx_stats: UnboundedSender, + tx_stats: UnboundedSender, ) -> Result { log::trace!( "enter: make_request(CONFIGURATION.Client, {}, {:?})", @@ -309,29 +307,29 @@ pub async fn make_request( if e.is_timeout() { // only warn for timeouts, while actual errors are still left as errors log_level = log::Level::Warn; - update_stat!(tx_stats, AddError(Timeout)); + send_command!(tx_stats, AddError(Timeout)); } else if e.is_redirect() { if let Some(last_redirect) = e.url() { // get where we were headed (last_redirect) and where we came from (url) let fancy_message = format!("{} !=> {}", url, last_redirect); let report = if let Some(msg_status) = e.status() { - update_stat!(tx_stats, AddStatus(msg_status)); + send_command!(tx_stats, AddStatus(msg_status)); create_report_string(msg_status.as_str(), "-1", "-1", "-1", &fancy_message) } else { create_report_string("UNK", "-1", "-1", "-1", &fancy_message) }; - update_stat!(tx_stats, AddError(Redirection)); + send_command!(tx_stats, AddError(Redirection)); ferox_print(&report, &PROGRESS_PRINTER) }; } else if e.is_connect() { - update_stat!(tx_stats, AddError(Connection)); + send_command!(tx_stats, AddError(Connection)); } else if e.is_request() { - update_stat!(tx_stats, AddError(Request)); + send_command!(tx_stats, AddError(Request)); } else { - update_stat!(tx_stats, AddError(Other)); + send_command!(tx_stats, AddError(Other)); } if matches!(log_level, log::Level::Error) { @@ -344,7 +342,7 @@ pub async fn make_request( } Ok(resp) => { log::trace!("exit: make_request -> {:?}", resp); - update_stat!(tx_stats, AddStatus(resp.status())); + send_command!(tx_stats, AddStatus(resp.status())); Ok(resp) } } @@ -513,7 +511,7 @@ mod tests { #[test] /// base url + 1 word + no slash + no extension fn format_url_normal() { - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); assert_eq!( format_url("http://localhost", "stuff", false, &Vec::new(), None, tx).unwrap(), reqwest::Url::parse("http://localhost/stuff").unwrap() @@ -523,7 +521,7 @@ mod tests { #[test] /// base url + no word + no slash + no extension fn format_url_no_word() { - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); assert_eq!( format_url("http://localhost", "", false, &Vec::new(), None, tx).unwrap(), reqwest::Url::parse("http://localhost").unwrap() @@ -533,7 +531,7 @@ mod tests { #[test] /// base url + word + no slash + no extension + queries fn format_url_joins_queries() { - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); assert_eq!( format_url( "http://localhost", @@ -551,7 +549,7 @@ mod tests { #[test] /// base url + no word + no slash + no extension + queries fn format_url_without_word_joins_queries() { - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); assert_eq!( format_url( "http://localhost", @@ -570,14 +568,14 @@ mod tests { #[should_panic] /// no base url is an error fn format_url_no_url() { - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); format_url("", "stuff", false, &Vec::new(), None, tx).unwrap(); } #[test] /// word prepended with slash is adjusted for correctness fn format_url_word_with_preslash() { - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); assert_eq!( format_url("http://localhost", "/stuff", false, &Vec::new(), None, tx).unwrap(), reqwest::Url::parse("http://localhost/stuff").unwrap() @@ -587,7 +585,7 @@ mod tests { #[test] /// word with appended slash allows the slash to persist fn format_url_word_with_postslash() { - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); assert_eq!( format_url("http://localhost", "stuff/", false, &Vec::new(), None, tx).unwrap(), reqwest::Url::parse("http://localhost/stuff/").unwrap() @@ -597,7 +595,7 @@ mod tests { #[test] /// word with two prepended slashes doesn't discard the entire domain fn format_url_word_with_two_prepended_slashes() { - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); let result = format_url( "http://localhost", @@ -618,7 +616,7 @@ mod tests { #[test] /// word that is a fully formed url, should return an error fn format_url_word_that_is_a_url() { - let (tx, _): FeroxChannel = mpsc::unbounded_channel(); + let (tx, _): FeroxChannel = mpsc::unbounded_channel(); let url = format_url( "http://localhost", "http://schmocalhost",