work in progress, incremental save

This commit is contained in:
epi
2021-01-17 13:46:54 -06:00
parent 4ff943fe9f
commit 5fbf554282
25 changed files with 501 additions and 307 deletions

View File

@@ -1,7 +1,7 @@
use super::entry::BannerEntry;
use crate::{
config::Configuration,
statistics::StatCommand,
event_handlers::Command,
utils::{make_request, status_colorizer},
VERSION,
};
@@ -348,7 +348,7 @@ by Ben "epi" Risher {} ver: {}"#,
&mut self,
client: &Client,
url: &str,
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
) -> Result<()> {
log::trace!("enter: needs_update({:?}, {}, {:?})", client, url, tx_stats);

View File

@@ -2,7 +2,7 @@ use super::container::UpdateStatus;
use super::*;
use crate::{
config::{Configuration, CONFIGURATION},
statistics::StatCommand,
statistics::Command,
FeroxChannel,
};
use httpmock::Method::GET;
@@ -58,7 +58,7 @@ async fn banner_intialize_without_queries() {
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
/// test that
async fn banner_needs_update_returns_unknown_with_bad_url() {
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let mut banner = Banner::new(&[String::from("http://localhost")], &CONFIGURATION);
@@ -79,7 +79,7 @@ async fn banner_needs_update_returns_up_to_date() {
then.status(200).body("{\"tag_name\":\"v1.1.0\"}");
});
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let mut banner = Banner::new(&[srv.url("")], &CONFIGURATION);
banner.version = String::from("1.1.0");
@@ -102,7 +102,7 @@ async fn banner_needs_update_returns_out_of_date() {
then.status(200).body("{\"tag_name\":\"v1.1.0\"}");
});
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let mut banner = Banner::new(&[srv.url("")], &CONFIGURATION);
banner.version = String::from("1.0.1");
@@ -127,7 +127,7 @@ async fn banner_needs_update_returns_unknown_on_timeout() {
.delay(Duration::from_secs(8));
});
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let mut banner = Banner::new(&[srv.url("")], &CONFIGURATION);
@@ -149,7 +149,7 @@ async fn banner_needs_update_returns_unknown_on_bad_json_response() {
then.status(200).body("not json");
});
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let mut banner = Banner::new(&[srv.url("")], &CONFIGURATION);
@@ -172,7 +172,7 @@ async fn banner_needs_update_returns_unknown_on_json_without_correct_tag() {
.body("{\"no tag_name\": \"doesn't exist\"}");
});
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let mut banner = Banner::new(&[srv.url("")], &CONFIGURATION);
banner.version = String::from("1.0.1");

View File

@@ -0,0 +1,41 @@
// use crate::event_handlers::Handlers;
// use crate::filters::FilterCommand;
// use crate::statistics::StatCommand;
// use crate::traits::HandlerCommand;
// use tokio::sync::mpsc::UnboundedSender;
//
// /// todo
// #[derive(Default)]
// pub struct HandlersBuilder<H> {
// transmitters: Vec<UnboundedSender<H>>,
// }
//
// /// todo
// impl<H: HandlerCommand> HandlersBuilder<H> {
// /// todo
// pub fn new() -> Self {
// Self {
// ..Default::default()
// }
// }
//
// /// todo
// pub fn transmitter(&mut self, transmitter: UnboundedSender<H>) -> &mut Self {
// self.transmitters.push(transmitter);
// self
// }
//
// /// todo
// pub fn build(&self) -> Handlers {
// // let tx_stats: UnboundedSender<StatCommand> = self.transmitters[0];
// // let tx_filters: UnboundedSender<FilterCommand> = self.transmitters[1];
//
// for tx in self.transmitters {}
//
// Handlers {
// handles: Vec::new(),
// tx_stats,
// tx_filters,
// }
// }
// }

View File

@@ -1,9 +1,10 @@
use super::{error::StatError, field::StatField};
use crate::statistics::{StatError, StatField};
use crate::traits::FeroxFilter;
use reqwest::StatusCode;
/// Protocol definition for updating a Stats object via mpsc
/// Protocol definition for updating an event handler via mpsc
#[derive(Debug)]
pub enum StatCommand {
pub enum Command {
/// Add one to the total number of requests
AddRequest,
@@ -28,6 +29,9 @@ pub enum StatCommand {
/// Load a `Stats` object from disk
LoadStats(String),
/// Add a FeroxFilter implementor to FilterHandler's instance of FeroxFilters
AddFilter(Box<dyn FeroxFilter>),
/// Break out of the (infinite) mpsc receive loop
Exit,
}

View File

@@ -0,0 +1,151 @@
use super::Command;
use crate::{filters::FeroxFilters, statistics::Stats, CommandSender};
use anyhow::Result;
use std::sync::Arc;
use tokio::task::JoinHandle;
type Joiner = JoinHandle<Result<()>>;
pub struct Tasks {
pub terminal: Joiner,
pub file: Option<Joiner>,
pub stats: Joiner,
pub filters: Joiner,
}
impl Tasks {
pub fn new(terminal: Joiner, file: Option<Joiner>, stats: Joiner, filters: Joiner) -> Self {
Self {
terminal,
file,
stats,
filters,
}
}
}
#[derive(Clone, Debug)]
pub struct StatsHandle {
pub data: Arc<Stats>,
pub tx: CommandSender,
}
impl StatsHandle {
pub fn new(data: Arc<Stats>, tx: CommandSender) -> Self {
Self { data, tx }
}
pub fn send(&self, command: Command) -> Result<()> {
self.tx.send(command)?;
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct FiltersHandle {
pub data: Arc<FeroxFilters>,
pub tx: CommandSender,
}
impl FiltersHandle {
pub fn new(data: Arc<FeroxFilters>, tx: CommandSender) -> Self {
Self { data, tx }
}
pub fn send(&self, command: Command) -> Result<()> {
self.tx.send(command)?;
Ok(())
}
}
// #[derive(Clone, Debug)]
// pub struct ReporterTerminalHandle {
// pub data: Arc<Stats>,
// pub task: JoinHandle<Result<()>>,
// pub transmitter: CommandSender,
// }
//
// impl StatsHandle {
// pub fn new(data: Arc<Stats>, transmitter: CommandSender, task: JoinHandle<Result<()>>) -> Self {
// Self {
// data,
// task,
// transmitter,
// }
// }
// }
// #[derive(Clone, Debug)]
// pub enum EventHandle {
// Stats(StatsHandle),
// Filters(FiltersHandle),
// }
//
// impl EventHandle {
// pub fn send(&self, cmd: Command) -> Result<()> {
// if let EventHandle::Filters(handle) = self {
// send_command!(handle.transmitter, cmd);
// }
// Ok(())
// }
//
//
// pub fn stats(&self) -> Result<&StatsHandle> {
// match self {
// EventHandle::Stats(handle) => Ok(handle),
// _ => {
// bail!("no underlying StatsHandle found")
// }
// }
// }
//
// pub fn filters(&self) -> Result<&FiltersHandle> {
// match self {
// EventHandle::Filters(handle) => Ok(handle),
// _ => {
// bail!("no underlying FiltersHandle found")
// }
// }
// }
// }
// todo need to move these into their proper places
/// todo docs everywhere in this file
// /// todo docs if this pans out
#[derive(Clone, Debug)]
pub struct Handles {
pub stats: StatsHandle,
pub filters: FiltersHandle,
}
impl Handles {
pub fn new(stats: StatsHandle, filters: FiltersHandle) -> Self {
Self { stats, filters }
}
// /// todo
// ///
// /// expected order of task completion
// /// - terminal
// /// - file (if present)
// /// - filters
// /// - stats
// pub async fn clean_up(self) -> Result<()> {
// // todo trace
//
// send_command!(self.filters.tx, Command::Exit); // send exit command and await the end of the future
// self.filters
// .task
// .await
// .with_context(|| fmt_err("Could not await a filters handler's receiver"))?;
//
// send_command!(self.stats.tx, Command::Exit);
// self.stats
// .task
// .await
// .with_context(|| fmt_err("Could not await a stats handler's receiver"))?;
//
// Ok(())
// }
}

View File

@@ -0,0 +1,66 @@
use super::*;
use crate::{filters::FeroxFilters, FeroxChannel};
use anyhow::Result;
use std::sync::Arc;
use tokio::{
sync::mpsc::{self, UnboundedReceiver},
task::JoinHandle,
};
/// event handler for updating a single data structure of all active filters
#[derive(Debug)]
pub struct FiltersHandler {
/// collection of generic type `T` where `T` is some collection of data
data: Arc<FeroxFilters>,
/// Receiver half of mpsc from which `Command`s are processed
receiver: UnboundedReceiver<Command>,
}
/// implementation of event handler for filters
impl FiltersHandler {
/// create new event handler
pub fn new(data: Arc<FeroxFilters>, receiver: UnboundedReceiver<Command>) -> Self {
Self { data, receiver }
}
/// Initialize new `FeroxFilters` and the sc side of an mpsc channel that is responsible for
/// updates to the aforementioned object.
pub fn initialize() -> (JoinHandle<Result<()>>, FiltersHandle) {
log::trace!("enter: initialize");
let data = Arc::new(FeroxFilters::default());
let (tx, rx): FeroxChannel<Command> = mpsc::unbounded_channel();
let mut handler = Self::new(data.clone(), rx);
let task = tokio::spawn(async move { handler.start().await });
let event_handle = FiltersHandle::new(data, tx);
log::trace!("exit: initialize -> ({:?})", event_handle);
(task, event_handle)
}
/// Start a single consumer task (sc side of mpsc)
///
/// The consumer simply receives `Command` and acts accordingly
pub async fn start(&mut self) -> Result<()> {
log::trace!("enter: start({:?})", self);
while let Some(command) = self.receiver.recv().await {
match command {
Command::AddFilter(filter) => {
self.data.push(filter)?;
}
Command::Exit => break,
_ => {} // no other commands needed for FilterHandler
}
}
log::trace!("exit: start");
Ok(())
}
}

View File

@@ -1,3 +1,11 @@
//! collection of event handlers (typically long-running tokio spawned tasks)
mod statistics;
pub use statistics::StatsHandler;
mod filters;
mod container;
mod builder;
mod command;
pub use self::command::Command;
pub use self::container::{FiltersHandle, Handles, StatsHandle, Tasks};
pub use self::filters::FiltersHandler;
pub use self::statistics::StatsHandler;

View File

@@ -1,13 +1,16 @@
use super::*;
use crate::{
config::CONFIGURATION,
progress::{add_bar, BarType},
statistics::{StatCommand, StatField, Stats},
statistics::{StatField, Stats},
FeroxChannel,
};
use anyhow::Result;
use console::style;
use indicatif::ProgressBar;
use std::{sync::Arc, time::Instant};
use tokio::sync::mpsc::UnboundedReceiver;
use tokio::sync::mpsc::{self, UnboundedReceiver};
use tokio::task::JoinHandle;
/// event handler struct for updating statistics
#[derive(Debug)]
@@ -16,7 +19,7 @@ pub struct StatsHandler {
bar: ProgressBar,
/// Receiver half of mpsc from which `StatCommand`s are processed
receiver: UnboundedReceiver<StatCommand>,
receiver: UnboundedReceiver<Command>,
/// data class that stores all statistics updates
stats: Arc<Stats>,
@@ -24,8 +27,8 @@ pub struct StatsHandler {
/// implementation of event handler for statistics
impl StatsHandler {
/// create new event handler builder
pub fn new(stats: Arc<Stats>, rx_stats: UnboundedReceiver<StatCommand>) -> Self {
/// create new event handler
fn new(stats: Arc<Stats>, rx_stats: UnboundedReceiver<Command>) -> Self {
// will be updated later via StatCommand; delay is for banner to print first
let bar = ProgressBar::hidden();
@@ -39,30 +42,30 @@ impl StatsHandler {
/// Start a single consumer task (sc side of mpsc)
///
/// The consumer simply receives `StatCommands` and updates the given `Stats` object as appropriate
pub async fn start(&mut self) -> Result<()> {
async fn start(&mut self) -> Result<()> {
log::trace!("enter: start({:?})", self);
let start = Instant::now();
while let Some(command) = self.receiver.recv().await {
match command as StatCommand {
StatCommand::AddError(err) => {
match command as Command {
Command::AddError(err) => {
self.stats.add_error(err);
self.increment_bar();
}
StatCommand::AddStatus(status) => {
Command::AddStatus(status) => {
self.stats.add_status_code(status);
self.increment_bar();
}
StatCommand::AddRequest => {
Command::AddRequest => {
self.stats.add_request();
self.increment_bar();
}
StatCommand::Save => {
Command::Save => {
self.stats
.save(start.elapsed().as_secs_f64(), &CONFIGURATION.output)?;
}
StatCommand::UpdateUsizeField(field, value) => {
Command::UpdateUsizeField(field, value) => {
let update_len = matches!(field, StatField::TotalScans);
self.stats.update_usize_field(field, value);
@@ -70,16 +73,15 @@ impl StatsHandler {
self.bar.set_length(self.stats.total_expected() as u64)
}
}
StatCommand::UpdateF64Field(field, value) => {
self.stats.update_f64_field(field, value)
}
StatCommand::CreateBar => {
Command::UpdateF64Field(field, value) => self.stats.update_f64_field(field, value),
Command::CreateBar => {
self.bar = add_bar("", self.stats.total_expected() as u64, BarType::Total);
}
StatCommand::LoadStats(filename) => {
Command::LoadStats(filename) => {
self.stats.merge_from(&filename)?;
}
StatCommand::Exit => break,
Command::Exit => break,
_ => {} // no more commands needed
}
}
@@ -103,4 +105,23 @@ impl StatsHandler {
self.bar.set_message(&msg);
self.bar.inc(1);
}
/// Initialize new `Stats` object and the sc side of an mpsc channel that is responsible for
/// updates to the aforementioned object.
pub fn initialize() -> (JoinHandle<Result<()>>, StatsHandle) {
log::trace!("enter: initialize");
let data = Arc::new(Stats::new());
let (tx, rx): FeroxChannel<Command> = mpsc::unbounded_channel();
let mut handler = StatsHandler::new(data.clone(), rx);
let task = tokio::spawn(async move { handler.start().await });
let event_handle = StatsHandle::new(data, tx);
log::trace!("exit: initialize -> ({:?})", event_handle);
(task, event_handle)
}
}

View File

@@ -34,7 +34,7 @@ pub struct ExtractorBuilder<'a> {
config: Option<&'a Configuration>,
/// transmitter to the mpsc that handles statistics gathering
tx_stats: Option<UnboundedSender<StatCommand>>,
tx_stats: Option<UnboundedSender<Command>>,
/// transmitter to the mpsc that handles recursive scan calls
tx_recursion: Option<UnboundedSender<String>>,
@@ -106,7 +106,7 @@ impl<'a> ExtractorBuilder<'a> {
}
/// builder call to set `tx_stats`
pub fn stats_transmitter(&mut self, tx_stats: UnboundedSender<StatCommand>) -> &mut Self {
pub fn stats_transmitter(&mut self, tx_stats: UnboundedSender<Command>) -> &mut Self {
self.tx_stats = Some(tx_stats);
self
}

View File

@@ -1,12 +1,10 @@
use super::*;
use crate::{
client,
event_handlers::Command::UpdateUsizeField,
scanner::{send_report, should_filter_response, try_recursion},
statistics::{
StatCommand::UpdateUsizeField,
StatField::{LinksExtracted, TotalExpected},
},
update_stat,
send_command,
statistics::StatField::{LinksExtracted, TotalExpected},
utils::{format_url, make_request},
};
use anyhow::{bail, Context, Result};
@@ -42,7 +40,7 @@ pub struct Extractor<'a> {
pub(super) config: &'a Configuration,
/// transmitter to the mpsc that handles statistics gathering
pub(super) tx_stats: UnboundedSender<StatCommand>,
pub(super) tx_stats: UnboundedSender<Command>,
/// transmitter to the mpsc that handles recursive scan calls
pub(super) tx_recursion: UnboundedSender<String>,
@@ -399,8 +397,8 @@ impl<'a> Extractor<'a> {
fn update_stats(&self, num_links: usize) {
let multiplier = self.config.extensions.len().max(1);
update_stat!(self.tx_stats, UpdateUsizeField(LinksExtracted, num_links));
update_stat!(
send_command!(self.tx_stats, UpdateUsizeField(LinksExtracted, num_links));
send_command!(
self.tx_stats,
UpdateUsizeField(TotalExpected, num_links * multiplier)
);

View File

@@ -9,9 +9,7 @@ pub use self::builder::ExtractorBuilder;
pub use self::container::Extractor;
use crate::{
config::Configuration,
scan_manager::FeroxScans,
statistics::{StatCommand, Stats},
config::Configuration, event_handlers::Command, scan_manager::FeroxScans, statistics::Stats,
FeroxResponse,
};
use regex::Regex;

View File

@@ -44,7 +44,7 @@ fn get_test_response() -> FeroxResponse {
/// creates a single extractor that can be used to test standalone functions
fn setup_extractor(target: ExtractionTarget) -> Extractor<'static> {
let (tx_dir, _): FeroxChannel<String> = mpsc::unbounded_channel();
let (tx_stats, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx_stats, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let (tx_term, _): FeroxChannel<FeroxResponse> = mpsc::unbounded_channel();
let stats = Arc::new(Stats::new());
@@ -143,7 +143,7 @@ fn extractor_get_sub_paths_from_path_with_an_absolute_word() {
/// test that an ExtractorBuilder without a FeroxResponse and without a URL bails
fn extractor_builder_bails_when_neither_required_field_is_set() {
let (tx_dir, _): FeroxChannel<String> = mpsc::unbounded_channel();
let (tx_stats, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx_stats, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let (tx_term, _): FeroxChannel<FeroxResponse> = mpsc::unbounded_channel();
let stats = Arc::new(Stats::new());
@@ -167,7 +167,7 @@ fn extractor_with_non_base_url_bails() -> Result<()> {
let link = "admin";
let (tx_dir, _): FeroxChannel<String> = mpsc::unbounded_channel();
let (tx_stats, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx_stats, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let (tx_term, _): FeroxChannel<FeroxResponse> = mpsc::unbounded_channel();
let stats = Arc::new(Stats::new());
@@ -236,7 +236,7 @@ fn extractor_add_link_to_set_of_links_with_non_base_url() {
/// domain; expect an empty set returned
async fn extractor_get_links_with_absolute_url_that_differs_from_target_domain() -> Result<()> {
let (tx_dir, _): FeroxChannel<String> = mpsc::unbounded_channel();
let (tx_stats, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx_stats, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let (tx_term, _): FeroxChannel<FeroxResponse> = mpsc::unbounded_channel();
let stats = Arc::new(Stats::new());
@@ -283,7 +283,7 @@ async fn extractor_get_links_with_absolute_url_that_differs_from_target_domain()
/// test that /robots.txt is correctly requested given a base url (happy path)
async fn request_robots_txt_without_proxy() -> Result<()> {
let (tx_dir, _): FeroxChannel<String> = mpsc::unbounded_channel();
let (tx_stats, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx_stats, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let (tx_term, _): FeroxChannel<FeroxResponse> = mpsc::unbounded_channel();
let stats = Arc::new(Stats::new());
let config = Configuration::new();
@@ -323,7 +323,7 @@ async fn request_robots_txt_without_proxy() -> Result<()> {
/// test that /robots.txt is correctly requested given a base url (happy path) when a proxy is used
async fn request_robots_txt_with_proxy() -> Result<()> {
let (tx_dir, _): FeroxChannel<String> = mpsc::unbounded_channel();
let (tx_stats, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx_stats, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let (tx_term, _): FeroxChannel<FeroxResponse> = mpsc::unbounded_channel();
let stats = Arc::new(Stats::new());
let mut config = Configuration::new();

21
src/filters/container.rs Normal file
View File

@@ -0,0 +1,21 @@
use super::FeroxFilter;
use anyhow::Result;
use std::sync::Mutex;
/// Container around a collection of `FeroxFilters`s
#[derive(Debug, Default)]
pub struct FeroxFilters {
/// collection of `FeroxFilters`
pub filters: Mutex<Vec<Box<dyn FeroxFilter>>>,
}
/// implementation of FeroxFilter collection
impl FeroxFilters {
/// add a single FeroxFilter to the collection
pub fn push(&self, filter: Box<dyn FeroxFilter>) -> Result<()> {
if let Ok(mut unlocked) = self.filters.lock() {
unlocked.push(filter)
}
Ok(())
}
}

View File

@@ -6,9 +6,11 @@ mod lines;
mod size;
mod regex;
mod similarity;
mod container;
#[cfg(test)]
mod tests;
pub use self::container::FeroxFilters;
pub use self::lines::LinesFilter;
pub use self::regex::RegexFilter;
pub use self::similarity::SimilarityFilter;

View File

@@ -1,8 +1,8 @@
use crate::{
config::{CONFIGURATION, PROGRESS_PRINTER},
event_handlers::Command,
filters::WildcardFilter,
scanner::should_filter_response,
statistics::StatCommand,
utils::{ferox_print, format_url, get_url_path_length, make_request, status_colorizer},
FeroxResponse,
};
@@ -41,7 +41,7 @@ pub async fn wildcard_test(
target_url: &str,
bar: ProgressBar,
tx_term: UnboundedSender<FeroxResponse>,
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
) -> Option<WildcardFilter> {
log::trace!(
"enter: wildcard_test({:?}, {:?}, {:?}, {:?})",
@@ -147,7 +147,7 @@ async fn make_wildcard_request(
target_url: &str,
length: usize,
tx_file: UnboundedSender<FeroxResponse>,
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
) -> Option<FeroxResponse> {
log::trace!(
"enter: make_wildcard_request({}, {}, {:?}, {:?})",
@@ -220,7 +220,7 @@ async fn make_wildcard_request(
/// Any urls that are found to be alive are returned to the caller.
pub async fn connectivity_test(
target_urls: &[String],
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
) -> Vec<String> {
log::trace!(
"enter: connectivity_test({:?}, {:?})",

View File

@@ -1,7 +1,7 @@
pub mod banner;
pub mod config;
mod client;
mod event_handlers;
pub mod event_handlers;
mod filters;
pub mod heuristics;
pub mod logger;
@@ -16,6 +16,7 @@ pub mod utils;
mod extractor;
use crate::{
event_handlers::Command,
traits::FeroxSerialize,
utils::{fmt_err, get_url_path_length, status_colorizer},
};
@@ -34,6 +35,9 @@ use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
/// Generic Result type to ease error handling in async contexts
pub type FeroxResult<T> = std::result::Result<T, Box<dyn error::Error + Send + Sync + 'static>>;
/// Alias for UnboundedSender<Command>
pub type CommandSender = UnboundedSender<Command>;
/// Simple Error implementation to allow for custom error returns
#[derive(Debug, Default)]
pub struct FeroxError {

View File

@@ -1,20 +1,19 @@
use anyhow::{bail, Context, Result};
use anyhow::{bail, Result};
use crossterm::event::{self, Event, KeyCode};
use feroxbuster::{
banner::{Banner, UPDATE_URL},
config::{CONFIGURATION, PROGRESS_BAR, PROGRESS_PRINTER},
event_handlers::{
Command::{self, CreateBar, Exit, LoadStats, UpdateUsizeField},
FiltersHandler, Handles, StatsHandler, Tasks,
},
heuristics, logger,
progress::{add_bar, BarType},
reporter,
scan_manager::{self, ScanStatus, PAUSE_SCAN},
scanner::{self, scan_url, SCANNED_URLS},
statistics::{
self,
StatCommand::{self, CreateBar, LoadStats, UpdateUsizeField},
StatField::InitialTargets,
Stats,
},
update_stat,
send_command,
statistics::{StatField::InitialTargets, Stats},
utils::{ferox_print, fmt_err, get_current_depth, status_colorizer},
FeroxError, FeroxResponse, FeroxResult, SLEEP_DURATION,
};
@@ -34,7 +33,7 @@ use std::{
thread::sleep,
time::Duration,
};
use tokio::{io, sync::mpsc::UnboundedSender, task::JoinHandle};
use tokio::{io, sync::mpsc::UnboundedSender};
use tokio_util::codec::{FramedRead, LinesCodec};
/// Atomic boolean flag, used to determine whether or not the terminal input handler should exit
@@ -73,18 +72,10 @@ fn terminal_input_handler() {
}
/// Create a HashSet of Strings from the given wordlist then stores it inside an Arc
fn get_unique_words_from_wordlist(path: &str) -> FeroxResult<Arc<HashSet<String>>> {
fn get_unique_words_from_wordlist(path: &str) -> Result<Arc<HashSet<String>>> {
log::trace!("enter: get_unique_words_from_wordlist({})", path);
let file = match File::open(&path) {
Ok(f) => f,
Err(e) => {
log::error!("Could not open wordlist: {}", e);
log::trace!("exit: get_unique_words_from_wordlist -> {}", e);
return Err(Box::new(e));
}
};
let file = File::open(&path)?;
let reader = BufReader::new(file);
@@ -111,18 +102,16 @@ fn get_unique_words_from_wordlist(path: &str) -> FeroxResult<Arc<HashSet<String>
/// Determine whether it's a single url scan or urls are coming from stdin, then scan as needed
async fn scan(
targets: Vec<String>,
stats: Arc<Stats>,
tx_term: UnboundedSender<FeroxResponse>,
tx_file: UnboundedSender<FeroxResponse>,
tx_stats: UnboundedSender<StatCommand>,
) -> FeroxResult<()> {
handles: Handles,
) -> Result<()> {
log::trace!(
"enter: scan({:?}, {:?}, {:?}, {:?}, {:?})",
"enter: scan({:?}, {:?}, {:?}, {:?})",
targets,
stats,
tx_term,
tx_file,
tx_stats
handles
);
// cloning an Arc is cheap (it's basically a pointer into the heap)
// so that will allow for cheap/safe sharing of a single wordlist across multi-target scans
@@ -132,14 +121,10 @@ async fn scan(
.await??;
if words.len() == 0 {
let err = FeroxError {
message: format!("Did not find any words in {}", CONFIGURATION.wordlist),
};
return Err(Box::new(err));
bail!("Did not find any words in {}", CONFIGURATION.wordlist);
}
scanner::initialize(words.len(), &CONFIGURATION, tx_stats.clone()).await;
scanner::initialize(words.len(), &CONFIGURATION, handles.stats.tx.clone()).await;
// at this point, the stat thread's progress bar can be created; things that needed to happen
// first:
@@ -147,10 +132,12 @@ async fn scan(
// - scanner initialized (this sent expected requests per directory to the stats thread, which
// having been set, makes it so the progress bar doesn't flash as full before anything has
// even happened
update_stat!(tx_stats, CreateBar);
handles.stats.send(CreateBar)?;
if CONFIGURATION.resumed {
update_stat!(tx_stats, LoadStats(CONFIGURATION.resume_from.clone()));
handles
.stats
.send(LoadStats(CONFIGURATION.resume_from.clone()))?;
SCANNED_URLS.print_known_responses();
@@ -177,8 +164,8 @@ async fn scan(
let word_clone = words.clone();
let term_clone = tx_term.clone();
let file_clone = tx_file.clone();
let tx_stats_clone = tx_stats.clone();
let stats_clone = stats.clone();
let tx_stats_clone = handles.stats.tx.clone();
let stats_clone = handles.stats.data.clone();
let task = tokio::spawn(async move {
let base_depth = get_current_depth(&target);
@@ -260,13 +247,15 @@ async fn wrapped_main() -> Result<()> {
PROGRESS_BAR.join().unwrap();
});
let (stats, tx_stats, stats_handle) = statistics::initialize();
let (stats_task, stats_handle) = StatsHandler::initialize();
let (filters_task, filters_handle) = FiltersHandler::initialize();
let handles = Handles::new(stats_handle, filters_handle);
if !CONFIGURATION.time_limit.is_empty() {
// --time-limit value not an empty string, need to kick off the thread that enforces
// the limit
let max_time_stats = stats.clone();
let max_time_stats = handles.stats.data.clone();
tokio::spawn(async move {
scan_manager::start_max_time_thread(&CONFIGURATION.time_limit, max_time_stats).await
@@ -285,32 +274,29 @@ async fn wrapped_main() -> Result<()> {
if CONFIGURATION.save_state {
// start the ctrl+c handler
scan_manager::initialize(stats.clone());
scan_manager::initialize(handles.stats.data.clone());
}
// todo transmitters here added to handles and made event handlers
let (tx_term, tx_file, term_handle, file_handle) =
reporter::initialize(&CONFIGURATION.output, save_output, tx_stats.clone());
reporter::initialize(&CONFIGURATION.output, save_output, handles.stats.tx.clone());
let tasks = Tasks::new(term_handle, file_handle, stats_task, filters_task);
// get targets from command line or stdin
let targets = match get_targets().await {
Ok(t) => t,
Err(e) => {
// should only happen in the event that there was an error reading from stdin
clean_up(
tx_term,
term_handle,
tx_file,
file_handle,
tx_stats,
stats_handle,
save_output,
)
.await?;
clean_up(tx_term, tx_file, handles, tasks).await?;
bail!("Could not get determine initial targets: {}", e);
}
};
update_stat!(tx_stats, UpdateUsizeField(InitialTargets, targets.len()));
send_command!(
handles.stats.tx,
UpdateUsizeField(InitialTargets, targets.len())
);
if !CONFIGURATION.quiet {
// only print banner if -q isn't used
@@ -320,74 +306,32 @@ async fn wrapped_main() -> Result<()> {
// only interested in the side-effect that sets banner.update_status
let _ = banner
.check_for_updates(&CONFIGURATION.client, UPDATE_URL, tx_stats.clone())
.check_for_updates(&CONFIGURATION.client, UPDATE_URL, handles.stats.tx.clone())
.await;
banner.print_to(std_stderr, &CONFIGURATION)?;
}
// discard non-responsive targets
let live_targets = heuristics::connectivity_test(&targets, tx_stats.clone()).await;
let live_targets = heuristics::connectivity_test(&targets, handles.stats.tx.clone()).await;
if live_targets.is_empty() {
clean_up(
tx_term,
term_handle,
tx_file,
file_handle,
tx_stats,
stats_handle,
save_output,
)
.await?;
clean_up(tx_term, tx_file, handles, tasks).await?;
bail!(fmt_err("Could not find any live targets to scan"));
}
// kick off a scan against any targets determined to be responsive
match scan(
scan(
live_targets,
stats,
tx_term.clone(),
tx_file.clone(),
tx_stats.clone(),
)
.await
{
Ok(_) => {
log::info!("All scans complete!");
}
Err(e) => {
// todo status colorizer here and print is likely not needed
ferox_print(
&format!("{} while scanning: {}", status_colorizer("Error"), e),
&PROGRESS_PRINTER,
);
clean_up(
tx_term,
term_handle,
tx_file,
file_handle,
tx_stats,
stats_handle,
save_output,
)
.await?;
// todo bail?
process::exit(1);
}
};
clean_up(
tx_term,
term_handle,
tx_file,
file_handle,
tx_stats,
stats_handle,
save_output,
handles.clone(),
)
.await?;
log::info!("All scans complete!");
clean_up(tx_term, tx_file, handles, tasks).await?;
log::trace!("exit: wrapped_main");
Ok(())
}
@@ -395,54 +339,33 @@ async fn wrapped_main() -> Result<()> {
/// Single cleanup function that handles all the necessary drops/finishes etc required to gracefully
/// shutdown the program
async fn clean_up(
tx_term: UnboundedSender<FeroxResponse>,
term_handle: JoinHandle<()>,
tx_term: UnboundedSender<FeroxResponse>, // todo replace all of these with Handles obj
tx_file: UnboundedSender<FeroxResponse>,
file_handle: Option<JoinHandle<()>>,
tx_stats: UnboundedSender<StatCommand>,
stats_handle: JoinHandle<Result<()>>,
save_output: bool,
handles: Handles,
tasks: Tasks,
) -> Result<()> {
log::trace!(
"enter: clean_up({:?}, {:?}, {:?}, {:?}, {:?}, {:?}, {})",
"enter: clean_up({:?}, {:?}, {:?})", // todo missing tasks
tx_term,
term_handle,
tx_file,
file_handle,
tx_stats,
stats_handle,
save_output
handles,
);
drop(tx_term);
log::trace!("dropped terminal output handler's transmitter");
tasks.terminal.await??;
log::trace!("awaiting terminal output handler's receiver");
// after dropping tx, we can await the future where rx lived
term_handle
.await
.with_context(|| fmt_err("Could not await terminal output handler's receiver"))?;
log::trace!("done awaiting terminal output handler's receiver");
log::trace!("tx_file: {:?}", tx_file);
// the same drop/await process used on the terminal handler is repeated for the file handler
// we drop the file transmitter every time, because it's created no matter what
drop(tx_file);
log::trace!("dropped file output handler's transmitter");
if save_output {
// but we only await if -o was specified
log::trace!("awaiting file output handler's receiver");
file_handle
.unwrap()
.await
.with_context(|| fmt_err("Could not await file output handler's receiver"))?;
log::trace!("done awaiting file output handler's receiver");
if let Some(file_task) = tasks.file {
file_task.await??;
}
update_stat!(tx_stats, StatCommand::Exit); // send exit command and await the end of the future
stats_handle
.await?
.with_context(|| fmt_err("Could not await statistics handler's receiver"))?;
send_command!(handles.filters.tx, Exit);
tasks.filters.await??;
send_command!(handles.stats.tx, Exit);
tasks.stats.await??;
// mark all scans complete so the terminal input handler will exit cleanly
SCAN_COMPLETE.store(true, Ordering::Relaxed);
@@ -451,7 +374,7 @@ async fn clean_up(
// the final trace messages above
PROGRESS_PRINTER.finish();
drop(tx_stats);
// drop(tx_stats);
log::trace!("exit: clean_up");
Ok(())

View File

@@ -1,15 +1,13 @@
use crate::{
config::{CONFIGURATION, PROGRESS_PRINTER},
event_handlers::Command::{self, UpdateUsizeField},
scanner::RESPONSES,
statistics::{
StatCommand::{self, UpdateUsizeField},
StatField::ResourcesDiscovered,
},
update_stat,
send_command,
statistics::StatField::ResourcesDiscovered,
utils::{ferox_print, make_request, open_file},
FeroxChannel, FeroxResponse, FeroxSerialize,
};
use anyhow::Result;
use console::strip_ansi_codes;
use std::{
fs, io,
@@ -52,12 +50,12 @@ pub fn get_cached_file_handle(filename: &str) -> Option<Arc<RwLock<io::BufWriter
pub fn initialize(
output_file: &str,
save_output: bool,
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
) -> (
UnboundedSender<FeroxResponse>,
UnboundedSender<FeroxResponse>,
JoinHandle<()>,
Option<JoinHandle<()>>,
JoinHandle<Result<()>>,
Option<JoinHandle<Result<()>>>,
) {
log::trace!(
"enter: initialize({}, {}, {:?})",
@@ -103,9 +101,9 @@ pub fn initialize(
async fn spawn_terminal_reporter(
mut resp_chan: UnboundedReceiver<FeroxResponse>,
file_chan: UnboundedSender<FeroxResponse>,
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
save_output: bool,
) {
) -> Result<()> {
log::trace!(
"enter: spawn_terminal_reporter({:?}, {:?}, {:?}, {})",
resp_chan,
@@ -125,7 +123,7 @@ async fn spawn_terminal_reporter(
// print to stdout
ferox_print(&resp.as_str(), &PROGRESS_PRINTER);
update_stat!(tx_stats, UpdateUsizeField(ResourcesDiscovered, 1));
send_command!(tx_stats, UpdateUsizeField(ResourcesDiscovered, 1));
if save_output {
// -o used, need to send the report to be written out to disk
@@ -171,6 +169,7 @@ async fn spawn_terminal_reporter(
}
}
log::trace!("exit: spawn_terminal_reporter");
Ok(())
}
/// Spawn a single consumer task (sc side of mpsc)
@@ -179,14 +178,14 @@ async fn spawn_terminal_reporter(
/// the given reporting criteria
async fn spawn_file_reporter(
mut report_channel: UnboundedReceiver<FeroxResponse>,
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
output_file: &str,
) {
) -> Result<()> {
let buffered_file = match get_cached_file_handle(&CONFIGURATION.output) {
Some(file) => file,
None => {
log::trace!("exit: spawn_file_reporter");
return;
return Ok(());
}
};
@@ -202,9 +201,10 @@ async fn spawn_file_reporter(
safe_file_write(&response, buffered_file.clone(), CONFIGURATION.json);
}
update_stat!(tx_stats, StatCommand::Save);
send_command!(tx_stats, Command::Save);
log::trace!("exit: spawn_file_reporter");
Ok(())
}
/// Given a string and a reference to a locked buffered file, write the contents and flush

View File

@@ -1,5 +1,6 @@
use crate::{
config::{Configuration, CONFIGURATION},
event_handlers::Command::{self, UpdateF64Field, UpdateUsizeField},
extractor::ExtractorBuilder,
filters::{
LinesFilter, RegexFilter, SimilarityFilter, SizeFilter, StatusCodeFilter, WildcardFilter,
@@ -7,13 +8,12 @@ use crate::{
},
heuristics,
scan_manager::{FeroxResponses, FeroxScans, ScanStatus, PAUSE_SCAN},
send_command,
statistics::{
StatCommand::{self, UpdateF64Field, UpdateUsizeField},
StatField::{DirScanTimes, ExpectedPerScan, TotalScans, WildcardsFiltered},
Stats,
},
traits::FeroxFilter,
update_stat,
utils::{format_url, get_current_depth, make_request},
FeroxChannel, FeroxResponse, SIMILARITY_THRESHOLD,
};
@@ -112,7 +112,7 @@ fn spawn_recursion_handler(
stats: Arc<Stats>,
tx_term: UnboundedSender<FeroxResponse>,
tx_file: UnboundedSender<FeroxResponse>,
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
) -> BoxFuture<'static, Vec<Arc<JoinHandle<()>>>> {
log::trace!(
"enter: spawn_recursion_handler({:?}, wordlist[{} words...], {}, {:?}, {:?}, {:?}, {:?})",
@@ -136,7 +136,7 @@ fn spawn_recursion_handler(
continue;
}
update_stat!(tx_stats, UpdateUsizeField(TotalScans, 1));
send_command!(tx_stats, UpdateUsizeField(TotalScans, 1));
log::info!("received {} on recursion channel", resp);
@@ -186,7 +186,7 @@ fn create_urls(
target_url: &str,
word: &str,
extensions: &[String],
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
) -> Vec<Url> {
log::trace!(
"enter: create_urls({}, {}, {:?}, {:?})",
@@ -364,7 +364,7 @@ pub async fn try_recursion(
/// to the user or not.
pub fn should_filter_response(
response: &FeroxResponse,
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
) -> bool {
match FILTERS.read() {
Ok(filters) => {
@@ -372,7 +372,7 @@ pub fn should_filter_response(
// wildcard.should_filter goes here
if filter.should_filter_response(&response) {
if filter.as_any().downcast_ref::<WildcardFilter>().is_some() {
update_stat!(tx_stats, UpdateUsizeField(WildcardsFiltered, 1))
send_command!(tx_stats, UpdateUsizeField(WildcardsFiltered, 1))
}
return true;
}
@@ -397,7 +397,7 @@ async fn make_requests(
stats: Arc<Stats>,
dir_chan: UnboundedSender<String>,
report_chan: UnboundedSender<FeroxResponse>,
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
) {
log::trace!(
"enter: make_requests({}, {}, {}, {:?}, {:?}, {:?}, {:?})",
@@ -480,7 +480,7 @@ pub async fn scan_url(
stats: Arc<Stats>,
tx_term: UnboundedSender<FeroxResponse>,
tx_file: UnboundedSender<FeroxResponse>,
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
) {
log::trace!(
"enter: scan_url({:?}, wordlist[{} words...], {}, {:?}, {:?}, {:?}, {:?})",
@@ -520,7 +520,7 @@ pub async fn scan_url(
let _ = extractor.extract().await;
}
update_stat!(tx_stats, UpdateUsizeField(TotalScans, 1));
send_command!(tx_stats, UpdateUsizeField(TotalScans, 1));
// this protection allows us to add the first scanned url to SCANNED_URLS
// from within the scan_url function instead of the recursion handler
@@ -634,7 +634,7 @@ pub async fn scan_url(
producers.await;
log::trace!("done awaiting scan producers");
update_stat!(
send_command!(
tx_stats,
UpdateF64Field(DirScanTimes, scan_timer.elapsed().as_secs_f64())
);
@@ -663,7 +663,7 @@ pub async fn scan_url(
pub async fn initialize(
num_words: usize,
config: &Configuration,
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
) {
log::trace!(
"enter: initialize({}, {:?}, {:?})",
@@ -681,7 +681,7 @@ pub async fn initialize(
};
// tell Stats object about the number of expected requests
update_stat!(
send_command!(
tx_stats,
UpdateUsizeField(ExpectedPerScan, num_reqs_expected as usize)
);
@@ -791,7 +791,7 @@ mod tests {
#[test]
/// sending url + word without any extensions should get back one url with the joined word
fn create_urls_no_extension_returns_base_url_with_word() {
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let urls = create_urls("http://localhost", "turbo", &[], tx);
assert_eq!(urls, [Url::parse("http://localhost/turbo").unwrap()])
}
@@ -799,7 +799,7 @@ mod tests {
#[test]
/// sending url + word + 1 extension should get back two urls, one base and one with extension
fn create_urls_one_extension_returns_two_urls() {
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let urls = create_urls("http://localhost", "turbo", &[String::from("js")], tx);
assert_eq!(
urls,
@@ -838,7 +838,7 @@ mod tests {
vec![base, js, php, pdf, tar],
];
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
for (i, ext_set) in ext_vec.into_iter().enumerate() {
let urls = create_urls("http://localhost", "turbo", &ext_set, tx.clone());
@@ -894,7 +894,7 @@ mod tests {
filter_regex: vec![r"(".to_string()],
..Default::default()
};
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
initialize(1, &config, tx).await;
}
}

View File

@@ -414,9 +414,9 @@ mod tests {
async fn statistics_handler_increments_requests() {
let (stats, tx, handle) = setup_stats_test();
tx.send(StatCommand::AddRequest).unwrap_or_default();
tx.send(StatCommand::AddRequest).unwrap_or_default();
tx.send(StatCommand::AddRequest).unwrap_or_default();
tx.send(Command::AddRequest).unwrap_or_default();
tx.send(Command::AddRequest).unwrap_or_default();
tx.send(Command::AddRequest).unwrap_or_default();
teardown_stats_test(tx, handle).await;
@@ -433,8 +433,8 @@ mod tests {
async fn statistics_handler_increments_403() {
let (stats, tx, handle) = setup_stats_test();
let err = StatCommand::AddError(StatError::Status403);
let err2 = StatCommand::AddError(StatError::Status403);
let err = Command::AddError(StatError::Status403);
let err2 = Command::AddError(StatError::Status403);
tx.send(err).unwrap_or_default();
tx.send(err2).unwrap_or_default();
@@ -456,8 +456,8 @@ mod tests {
async fn statistics_handler_increments_403_via_status_code() {
let (stats, tx, handle) = setup_stats_test();
let err = StatCommand::AddStatus(reqwest::StatusCode::FORBIDDEN);
let err2 = StatCommand::AddStatus(reqwest::StatusCode::FORBIDDEN);
let err = Command::AddStatus(reqwest::StatusCode::FORBIDDEN);
let err2 = Command::AddStatus(reqwest::StatusCode::FORBIDDEN);
tx.send(err).unwrap_or_default();
tx.send(err2).unwrap_or_default();
@@ -477,8 +477,8 @@ mod tests {
async fn statistics_handler_increments_500_via_status_code() {
let (stats, tx, handle) = setup_stats_test();
let err = StatCommand::AddStatus(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
let err2 = StatCommand::AddStatus(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
let err = Command::AddStatus(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
let err2 = Command::AddStatus(reqwest::StatusCode::INTERNAL_SERVER_ERROR);
tx.send(err).unwrap_or_default();
tx.send(err2).unwrap_or_default();

View File

@@ -1,34 +1 @@
use super::{command::StatCommand, container::Stats};
use crate::{event_handlers::StatsHandler, FeroxChannel};
use anyhow::Result;
use std::sync::Arc;
use tokio::{
sync::mpsc::{self, UnboundedSender},
task::JoinHandle,
};
/// Initialize new `Stats` object and the sc side of an mpsc channel that is responsible for
/// updates to the aforementioned object.
pub fn initialize() -> (
Arc<Stats>,
UnboundedSender<StatCommand>,
JoinHandle<Result<()>>,
) {
log::trace!("enter: initialize");
let stats_tracker = Arc::new(Stats::new());
let (tx_stats, rx_stats): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let mut handler = StatsHandler::new(stats_tracker.clone(), rx_stats);
let stats_thread = tokio::spawn(async move { handler.start().await });
log::trace!(
"exit: initialize -> ({:?}, {:?}, {:?})",
stats_tracker,
tx_stats,
stats_thread
);
(stats_tracker, tx_stats, stats_thread)
}

View File

@@ -1,17 +1,13 @@
mod error;
mod macros;
mod container;
mod command;
mod field;
mod init;
#[cfg(test)]
mod tests;
pub use self::command::StatCommand;
pub use self::container::Stats;
pub use self::error::StatError;
pub use self::field::StatField;
pub use self::init::initialize;
#[cfg(test)]
use self::tests::{setup_stats_test, teardown_stats_test};

View File

@@ -1,5 +1,5 @@
use super::*;
use crate::FeroxSerialize;
use crate::{event_handlers::StatsHandler, FeroxSerialize};
use anyhow::Result;
use reqwest::StatusCode;
use std::sync::Arc;
@@ -7,22 +7,15 @@ use tempfile::NamedTempFile;
use tokio::{sync::mpsc::UnboundedSender, task::JoinHandle};
/// simple helper to reduce code reuse
pub fn setup_stats_test() -> (
Arc<Stats>,
UnboundedSender<StatCommand>,
JoinHandle<Result<()>>,
) {
initialize()
pub fn setup_stats_test() -> (Arc<Stats>, UnboundedSender<Command>, JoinHandle<Result<()>>) {
StatsHandler::initialize()
}
/// another helper to stay DRY; must be called after any sent commands and before any checks
/// performed against the Stats object
pub async fn teardown_stats_test(
sender: UnboundedSender<StatCommand>,
handle: JoinHandle<Result<()>>,
) {
pub async fn teardown_stats_test(sender: UnboundedSender<Command>, handle: JoinHandle<Result<()>>) {
// send exit and await, once the await completes, stats should be updated
sender.send(StatCommand::Exit).unwrap_or_default();
sender.send(Command::Exit).unwrap_or_default();
handle.await.unwrap().unwrap();
}
@@ -31,7 +24,7 @@ pub async fn teardown_stats_test(
async fn statistics_handler_exits() {
let (_, sender, handle) = setup_stats_test();
sender.send(StatCommand::Exit).unwrap_or_default();
sender.send(Command::Exit).unwrap_or_default();
handle.await.unwrap().unwrap(); // blocks on the handler's while loop

View File

@@ -41,3 +41,6 @@ pub trait FeroxSerialize: Serialize {
/// Return an NDJSON representation of the object
fn as_json(&self) -> Result<String>;
}
/// todo doc
pub trait HandlerCommand {}

View File

@@ -1,10 +1,8 @@
#![macro_use]
use crate::{
config::{CONFIGURATION, PROGRESS_PRINTER},
statistics::{
StatCommand::{self, AddError, AddStatus},
StatError::{Connection, Other, Redirection, Request, Timeout, UrlFormat},
},
event_handlers::Command::{self, AddError, AddStatus},
statistics::StatError::{Connection, Other, Redirection, Request, Timeout, UrlFormat},
FeroxError,
};
use anyhow::{bail, Context, Result};
@@ -168,7 +166,7 @@ pub fn ferox_print(msg: &str, bar: &ProgressBar) {
#[macro_export]
/// wrapper to improve code readability
macro_rules! update_stat {
macro_rules! send_command {
($tx:expr, $value:expr) => {
$tx.send($value).unwrap_or_default();
};
@@ -183,7 +181,7 @@ pub fn format_url(
add_slash: bool,
queries: &[(String, String)],
extension: Option<&str>,
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
) -> Result<Url> {
log::trace!(
"enter: format_url({}, {}, {}, {:?} {:?}, {:?})",
@@ -211,7 +209,7 @@ pub fn format_url(
let err = FeroxError { message };
update_stat!(tx_stats, AddError(UrlFormat));
send_command!(tx_stats, AddError(UrlFormat));
log::trace!("exit: format_url -> {}", err);
bail!("{}", err);
@@ -281,7 +279,7 @@ pub fn format_url(
}
}
Err(e) => {
update_stat!(tx_stats, AddError(UrlFormat));
send_command!(tx_stats, AddError(UrlFormat));
log::trace!("exit: format_url -> {}", e);
log::error!("Could not join {} with {}", word, base_url);
bail!("{}", e)
@@ -293,7 +291,7 @@ pub fn format_url(
pub async fn make_request(
client: &Client,
url: &Url,
tx_stats: UnboundedSender<StatCommand>,
tx_stats: UnboundedSender<Command>,
) -> Result<Response> {
log::trace!(
"enter: make_request(CONFIGURATION.Client, {}, {:?})",
@@ -309,29 +307,29 @@ pub async fn make_request(
if e.is_timeout() {
// only warn for timeouts, while actual errors are still left as errors
log_level = log::Level::Warn;
update_stat!(tx_stats, AddError(Timeout));
send_command!(tx_stats, AddError(Timeout));
} else if e.is_redirect() {
if let Some(last_redirect) = e.url() {
// get where we were headed (last_redirect) and where we came from (url)
let fancy_message = format!("{} !=> {}", url, last_redirect);
let report = if let Some(msg_status) = e.status() {
update_stat!(tx_stats, AddStatus(msg_status));
send_command!(tx_stats, AddStatus(msg_status));
create_report_string(msg_status.as_str(), "-1", "-1", "-1", &fancy_message)
} else {
create_report_string("UNK", "-1", "-1", "-1", &fancy_message)
};
update_stat!(tx_stats, AddError(Redirection));
send_command!(tx_stats, AddError(Redirection));
ferox_print(&report, &PROGRESS_PRINTER)
};
} else if e.is_connect() {
update_stat!(tx_stats, AddError(Connection));
send_command!(tx_stats, AddError(Connection));
} else if e.is_request() {
update_stat!(tx_stats, AddError(Request));
send_command!(tx_stats, AddError(Request));
} else {
update_stat!(tx_stats, AddError(Other));
send_command!(tx_stats, AddError(Other));
}
if matches!(log_level, log::Level::Error) {
@@ -344,7 +342,7 @@ pub async fn make_request(
}
Ok(resp) => {
log::trace!("exit: make_request -> {:?}", resp);
update_stat!(tx_stats, AddStatus(resp.status()));
send_command!(tx_stats, AddStatus(resp.status()));
Ok(resp)
}
}
@@ -513,7 +511,7 @@ mod tests {
#[test]
/// base url + 1 word + no slash + no extension
fn format_url_normal() {
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
assert_eq!(
format_url("http://localhost", "stuff", false, &Vec::new(), None, tx).unwrap(),
reqwest::Url::parse("http://localhost/stuff").unwrap()
@@ -523,7 +521,7 @@ mod tests {
#[test]
/// base url + no word + no slash + no extension
fn format_url_no_word() {
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
assert_eq!(
format_url("http://localhost", "", false, &Vec::new(), None, tx).unwrap(),
reqwest::Url::parse("http://localhost").unwrap()
@@ -533,7 +531,7 @@ mod tests {
#[test]
/// base url + word + no slash + no extension + queries
fn format_url_joins_queries() {
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
assert_eq!(
format_url(
"http://localhost",
@@ -551,7 +549,7 @@ mod tests {
#[test]
/// base url + no word + no slash + no extension + queries
fn format_url_without_word_joins_queries() {
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
assert_eq!(
format_url(
"http://localhost",
@@ -570,14 +568,14 @@ mod tests {
#[should_panic]
/// no base url is an error
fn format_url_no_url() {
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
format_url("", "stuff", false, &Vec::new(), None, tx).unwrap();
}
#[test]
/// word prepended with slash is adjusted for correctness
fn format_url_word_with_preslash() {
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
assert_eq!(
format_url("http://localhost", "/stuff", false, &Vec::new(), None, tx).unwrap(),
reqwest::Url::parse("http://localhost/stuff").unwrap()
@@ -587,7 +585,7 @@ mod tests {
#[test]
/// word with appended slash allows the slash to persist
fn format_url_word_with_postslash() {
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
assert_eq!(
format_url("http://localhost", "stuff/", false, &Vec::new(), None, tx).unwrap(),
reqwest::Url::parse("http://localhost/stuff/").unwrap()
@@ -597,7 +595,7 @@ mod tests {
#[test]
/// word with two prepended slashes doesn't discard the entire domain
fn format_url_word_with_two_prepended_slashes() {
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let result = format_url(
"http://localhost",
@@ -618,7 +616,7 @@ mod tests {
#[test]
/// word that is a fully formed url, should return an error
fn format_url_word_that_is_a_url() {
let (tx, _): FeroxChannel<StatCommand> = mpsc::unbounded_channel();
let (tx, _): FeroxChannel<Command> = mpsc::unbounded_channel();
let url = format_url(
"http://localhost",
"http://schmocalhost",