Reduce crawling memory consumption,

Increase crawling threads,
Dynamically adjust crawling rate.
This commit is contained in:
vlofgren 2022-08-23 00:35:42 +02:00
parent f48e92630e
commit 6e2fdb7a77
4 changed files with 102 additions and 40 deletions

View File

@ -30,7 +30,7 @@ public class CrawlerMain implements AutoCloseable {
private final UserAgent userAgent;
private final ThreadPoolExecutor pool;
final int poolSize = 256;
final int poolSize = 512;
final int poolQueueSize = 32;
public CrawlerMain(EdgeCrawlPlan plan) throws Exception {
@ -92,7 +92,6 @@ public class CrawlerMain implements AutoCloseable {
AbortMonitor abortMonitor = AbortMonitor.getInstance();
Semaphore taskSem = new Semaphore(poolSize);
plan.forEachCrawlingSpecification(spec -> {

View File

@ -17,7 +17,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.time.LocalDateTime;
import java.util.ArrayList;
@ -25,12 +24,19 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.Optional;
import static java.lang.Math.max;
import static java.lang.Math.min;
public class CrawlerRetreiver {
private static final long DEFAULT_CRAWL_DELAY_MS = Long.getLong("defaultCrawlDelay", 1000);
private static final long DEFAULT_CRAWL_DELAY_MIN_MS = Long.getLong("defaultCrawlDelay", 250);
private static final long DEFAULT_CRAWL_DELAY_MAX_MS = Long.getLong("defaultCrawlDelaySlow", 2500);
private final LinkedList<EdgeUrl> queue = new LinkedList<>();
private final HttpFetcher fetcher;
private final HashSet<EdgeUrl> visited;
private final HashSet<EdgeUrl> known;
private final HashSet<String> visited;
private final HashSet<String> known;
private boolean slowDown = false;
private final int depth;
private final String id;
@ -64,15 +70,13 @@ public class CrawlerRetreiver {
crawledDomainWriter = writer;
for (String urlStr : specs.urls) {
EdgeUrl.parse(urlStr)
.filter(known::add)
.ifPresent(queue::addLast);
EdgeUrl.parse(urlStr).ifPresent(this::addToQueue);
}
if (queue.peek() != null) {
var fst = queue.peek();
var root = fst.domain.toRootUrl();
if (known.add(root))
if (known.add(root.toString()))
queue.addFirst(root);
}
}
@ -147,7 +151,7 @@ public class CrawlerRetreiver {
continue;
if (top.toString().length() > 255)
continue;
if (!visited.add(top))
if (!visited.add(top.toString()))
continue;
if (fetchDocument(top, crawlDelay)) {
@ -172,9 +176,7 @@ public class CrawlerRetreiver {
crawledDomainWriter.accept(d);
if (d.url != null) {
try {
visited.add(new EdgeUrl(d.url));
} catch (URISyntaxException ex) {}
EdgeUrl.parse(d.url).map(EdgeUrl::toString).ifPresent(visited::add);
}
}
@ -192,8 +194,7 @@ public class CrawlerRetreiver {
private Optional<CrawledDocument> fetchUrl(EdgeUrl top) {
try {
var doc = fetcher.fetchContent(top);
var doc = fetchContent(top);
if (doc.documentBody != null) {
@ -217,6 +218,24 @@ public class CrawlerRetreiver {
}
@SneakyThrows
private CrawledDocument fetchContent(EdgeUrl top) {
for (int i = 0; i < 2; i++) {
try {
return fetcher.fetchContent(top);
}
catch (RateLimitException ex) {
slowDown = true;
int delay = ex.retryAfter();
if (delay > 0 && delay < 5000) {
Thread.sleep(delay);
}
}
}
return createRetryError(top);
}
private String createHash(String documentBodyHash) {
return hashMethod.hashUnencodedChars(documentBodyHash).toString();
}
@ -235,28 +254,29 @@ public class CrawlerRetreiver {
baseUrl = linkParser.getBaseLink(parsed, baseUrl);
for (var link : parsed.getElementsByTag("a")) {
linkParser.parseLink(baseUrl, link)
.filter(this::isSameDomain)
.filter(u -> !urlBlocklist.isUrlBlocked(u))
.filter(u -> !urlBlocklist.isMailingListLink(u))
.filter(known::add)
.ifPresent(queue::addLast);
linkParser.parseLink(baseUrl, link).ifPresent(this::addToQueue);
}
for (var link : parsed.getElementsByTag("frame")) {
linkParser.parseFrame(baseUrl, link)
.filter(this::isSameDomain)
.filter(u -> !urlBlocklist.isUrlBlocked(u))
.filter(u -> !urlBlocklist.isMailingListLink(u))
.filter(known::add)
.ifPresent(queue::addLast);
linkParser.parseFrame(baseUrl, link).ifPresent(this::addToQueue);
}
for (var link : parsed.getElementsByTag("iframe")) {
linkParser.parseFrame(baseUrl, link)
.filter(this::isSameDomain)
.filter(u -> !urlBlocklist.isUrlBlocked(u))
.filter(u -> !urlBlocklist.isMailingListLink(u))
.filter(known::add)
.ifPresent(queue::addLast);
linkParser.parseFrame(baseUrl, link).ifPresent(this::addToQueue);
}
}
private void addToQueue(EdgeUrl url) {
if (!isSameDomain(url))
return;
if (urlBlocklist.isUrlBlocked(url))
return;
if (urlBlocklist.isMailingListLink(url))
return;
// reduce memory usage by not growing queue huge when crawling large sites
if (queue.size() + visited.size() >= depth + 100)
return;
if (known.add(url.toString())) {
queue.addLast(url);
}
}
@ -284,13 +304,24 @@ public class CrawlerRetreiver {
if (spentTime > sleepTime)
return;
Thread.sleep(Math.min(sleepTime-spentTime, 5000));
Thread.sleep(min(sleepTime-spentTime, 5000));
}
else if (slowDown) {
Thread.sleep( 1000);
}
else {
if (spentTime > DEFAULT_CRAWL_DELAY_MS)
// When no crawl delay is specified, lean toward twice the fetch+process time,
// within sane limits. This means slower servers get slower crawling, and faster
// servers get faster crawling.
sleepTime = spentTime * 2;
sleepTime = min(sleepTime, DEFAULT_CRAWL_DELAY_MAX_MS);
sleepTime = max(sleepTime, DEFAULT_CRAWL_DELAY_MIN_MS);
if (spentTime > sleepTime)
return;
Thread.sleep(DEFAULT_CRAWL_DELAY_MS - spentTime);
Thread.sleep(sleepTime-spentTime);
}
}
@ -302,7 +333,14 @@ public class CrawlerRetreiver {
.crawlerStatus(CrawlerDocumentStatus.ROBOTS_TXT.name())
.build();
}
private CrawledDocument createRetryError(EdgeUrl url) {
return CrawledDocument.builder()
.url(url.toString())
.timestamp(LocalDateTime.now().toString())
.httpStatus(429)
.crawlerStatus(CrawlerDocumentStatus.ERROR.name())
.build();
}
private CrawledDomain createErrorPostFromStatus(HttpFetcher.FetchResult ret) {
String ip = findIp(domain);

View File

@ -141,7 +141,7 @@ public class HttpFetcher {
}
@SneakyThrows
public CrawledDocument fetchContent(EdgeUrl url) {
public CrawledDocument fetchContent(EdgeUrl url) throws RateLimitException {
if (contentTypeLogic.isUrlLikeBinary(url)) {
logger.debug("Probing suspected binary {}", url);
@ -192,13 +192,17 @@ public class HttpFetcher {
.build();
}
private CrawledDocument extractBody(EdgeUrl url, Response rsp) throws IOException, URISyntaxException {
private CrawledDocument extractBody(EdgeUrl url, Response rsp) throws IOException, URISyntaxException, RateLimitException {
var responseUrl = new EdgeUrl(rsp.request().url().toString());
if (!Objects.equals(responseUrl.domain, url.domain)) {
return createRedirectResponse(url, rsp, responseUrl);
}
if (rsp.code() == 429) {
throw new RateLimitException(rsp.header("Retry-After", "1000"));
}
var body = rsp.body();
if (null == body) {
return createErrorResponse(url, rsp, CrawlerDocumentStatus.ERROR, "No body");

View File

@ -0,0 +1,21 @@
package nu.marginalia.wmsa.edge.crawling.retreival;
public class RateLimitException extends Exception {
private final String retryAfter;
public RateLimitException(String retryAfter) {
this.retryAfter = retryAfter;
}
@Override
public StackTraceElement[] getStackTrace() { return new StackTraceElement[0]; }
public int retryAfter() {
try {
return Integer.parseInt(retryAfter);
}
catch (NumberFormatException ex) {
return 1000;
}
}
}