Skip to content

Commit

Permalink
perf(grpc): add arc handling inner crawl
Browse files Browse the repository at this point in the history
  • Loading branch information
j-mendez committed Feb 16, 2023
1 parent eea86a9 commit f555648
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "website_crawler"
version = "0.7.37"
version = "0.7.38"
authors = ["Jeff Mendez <[email protected]>"]
edition = "2018"
description = "gRPC tokio based web crawler"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ This is a basic example crawling a web page, add spider to your `Cargo.toml`:

```toml
[dependencies]
website_crawler = "0.7.36"
website_crawler = "0.7.38"
```

And then the code:
Expand Down
1 change: 1 addition & 0 deletions proto/crawler.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ message ScanRequest {
bool tld = 6; // allow tld crawling all . ext.
string proxy = 7; // connect to proxy.
bool sitemap = 8; // extend crawl with sitemap links.
uint64 delay = 9; // the delay throttle between links.
}
40 changes: 19 additions & 21 deletions src/packages/spider/website.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::black_list::contains;
use super::configuration::Configuration;
use super::page::{build, Page, get_page_selectors};
use super::page::{build, get_page_selectors, Page};
use super::robotparser::RobotFileParser;
use super::utils::log;
use crate::rpc::client::{monitor, WebsiteServiceClient};
Expand All @@ -16,9 +16,7 @@ use sitemap::{
use std::sync::Arc;
use std::time::Duration;
use tokio;
use tokio::sync::mpsc::{
unbounded_channel, UnboundedReceiver, UnboundedSender,
};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::task;
use tokio_stream::StreamExt;
Expand All @@ -45,8 +43,6 @@ pub struct Website {
links: HashSet<String>,
/// contains all visited URL.
links_visited: HashSet<String>,
/// contains page visited
pages: Vec<Page>,
/// Robot.txt parser holder.
robot_file_parser: Option<RobotFileParser>,
/// current sitemap url
Expand All @@ -66,19 +62,14 @@ impl Website {
links_visited: HashSet::new(),
links: HashSet::from([domain_base.clone()]), // todo: remove dup mem usage for domain tracking
domain: domain_base,
pages: Vec::new(),
robot_file_parser: None,
sitemap_url: String::from(""),
}
}

/// page clone
pub fn get_pages(&self) -> Vec<Page> {
if !self.pages.is_empty() {
self.pages.clone()
} else {
self.links_visited.iter().map(|l| build(l, "")).collect()
}
self.links_visited.iter().map(|l| build(l, "")).collect()
}

/// links visited getter
Expand Down Expand Up @@ -196,16 +187,21 @@ impl Website {

/// Start to crawl website concurrently
async fn crawl_concurrent(&mut self, client: &Client) {
let selector = Arc::new(get_page_selectors(&self.domain, self.configuration.subdomains, self.configuration.subdomains));
let selector = Arc::new(get_page_selectors(
&self.domain,
self.configuration.subdomains,
self.configuration.subdomains,
));
let throttle = self.get_delay();

// crawl while links exists
while !self.links.is_empty() {
let (tx, mut rx): (UnboundedSender<Message>, UnboundedReceiver<Message>) = unbounded_channel();
let (tx, mut rx): (UnboundedSender<Message>, UnboundedReceiver<Message>) =
unbounded_channel();

let stream = tokio_stream::iter(&self.links).throttle(throttle);
tokio::pin!(stream);

while let Some(link) = stream.next().await {
if !self.is_allowed(link) {
continue;
Expand Down Expand Up @@ -256,6 +252,7 @@ impl Website {
self.configuration.subdomains,
self.configuration.tld,
));
let rpcx = Arc::new(rpcx);
let throttle = self.get_delay();

// determine if crawl is still active
Expand All @@ -272,13 +269,13 @@ impl Website {

if self.configuration.sitemap {
self.sitemap_crawl(
&handle, client, rpcx, &semaphore, &selector, &throttle, user_id, &txx
&handle, client, &rpcx, &semaphore, &selector, &throttle, user_id, &txx,
)
.await;
};

self.inner_crawl(
&handle, client, rpcx, &semaphore, &selector, &throttle, user_id, &txx
&handle, client, &rpcx, &semaphore, &selector, &throttle, user_id, &txx,
)
.await;
}
Expand All @@ -288,7 +285,7 @@ impl Website {
&mut self,
handle: &tokio::task::JoinHandle<bool>,
client: &Client,
rpcx: &mut WebsiteServiceClient<Channel>,
rpcx: &Arc<&mut WebsiteServiceClient<Channel>>,
semaphore: &Arc<Semaphore>,
selector: &Arc<(Selector, String)>,
throttle: &Duration,
Expand Down Expand Up @@ -338,12 +335,12 @@ impl Website {
&mut self,
handle: &tokio::task::JoinHandle<bool>,
client: &Client,
rpcx: &mut WebsiteServiceClient<Channel>,
rpcx: &Arc<&mut WebsiteServiceClient<Channel>>,
semaphore: &Arc<Semaphore>,
selector: &Arc<(Selector, String)>,
throttle: &Duration,
user_id: u32,
txx: &UnboundedSender<bool>
txx: &UnboundedSender<bool>,
) {
self.sitemap_url = string_concat!(self.domain, "sitemap.xml");

Expand Down Expand Up @@ -394,7 +391,8 @@ impl Website {

// crawl between each link
self.inner_crawl(
&handle, client, rpcx, &semaphore, &selector, &throttle, user_id, &txx
&handle, client, rpcx, &semaphore, &selector, &throttle,
user_id, &txx,
)
.await;
}
Expand Down
4 changes: 4 additions & 0 deletions src/rpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ impl Crawler for MyCrawler {
let id = req.id;
let proxy = req.proxy;
let sitemap = req.sitemap;
let delay = req.delay;

let reply = crawler::ScanReply {
message: format!("scanning - {:?}", &url).into(),
Expand All @@ -41,6 +42,7 @@ impl Crawler for MyCrawler {
tld,
proxy,
sitemap,
delay,
)
.await
.unwrap_or_default();
Expand All @@ -60,6 +62,7 @@ impl Crawler for MyCrawler {
let id = req.id;
let proxy = req.proxy;
let sitemap = req.sitemap;
let delay = req.delay;

let reply = crawler::ScanReply {
message: format!("scanning - {:?}", &url).into(),
Expand All @@ -75,6 +78,7 @@ impl Crawler for MyCrawler {
tld,
proxy,
sitemap,
delay,
)
.await
.unwrap_or_default();
Expand Down
3 changes: 2 additions & 1 deletion src/scanner/crawl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,12 @@ pub async fn crawl(
tld: bool,
proxy: String,
sitemap: bool,
delay: u64,
) -> Result<(), core::fmt::Error> {
let mut website: Website = Website::new(&domain);

website.configuration.respect_robots_txt = respect_robots_txt;
website.configuration.delay = 0;
website.configuration.delay = delay;
website.configuration.subdomains = subdomains;
website.configuration.tld = tld;
website.configuration.proxy = proxy;
Expand Down
3 changes: 2 additions & 1 deletion src/scanner/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@ pub async fn scan(
tld: bool,
proxy: String,
sitemap: bool,
delay: u64,
) -> Result<(), core::fmt::Error> {
let mut client = create_client().await.unwrap();
let mut website: Website = Website::new(&domain);

website.configuration.respect_robots_txt = respect_robots_txt;
website.configuration.delay = 0;
website.configuration.delay = delay;
website.configuration.subdomains = subdomains;
website.configuration.tld = tld;
website.configuration.proxy = proxy;
Expand Down

0 comments on commit f555648

Please sign in to comment.