(warc) Further tidying
This commit includes mostly exception handling, error propagation, a few bug fixes and minor changes to log formatting. The CrawlDelayTimer, HTTP 429 responses and IOException responses are now more accurately handled. A non-standard WarcXEntityRefused WARC record has also been introduced, essentially acting as a rejected 'response' with different semantics. Besides these, several existing features have been refined, such as URL encoding, crawl depth incrementing and usage of Content-Length headers.
This commit is contained in:
parent
0889b6d247
commit
9fea22b90d
12 changed files with 245 additions and 114 deletions
|
@ -28,6 +28,7 @@ public class WarcSerializableCrawlDataStream implements AutoCloseable, Serializa
|
||||||
path = file;
|
path = file;
|
||||||
reader = new WarcReader(file);
|
reader = new WarcReader(file);
|
||||||
WarcXResponseReference.register(reader);
|
WarcXResponseReference.register(reader);
|
||||||
|
WarcXEntityRefused.register(reader);
|
||||||
|
|
||||||
backingIterator = reader.iterator();
|
backingIterator = reader.iterator();
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,29 +9,34 @@ import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
import java.nio.file.Path;
|
import java.nio.file.Path;
|
||||||
|
|
||||||
public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable {
|
public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable {
|
||||||
private final ParquetWriter<CrawledDocumentParquetRecord> writer;
|
private final ParquetWriter<CrawledDocumentParquetRecord> writer;
|
||||||
private static final Logger logger = LoggerFactory.getLogger(CrawledDocumentParquetRecordFileWriter.class);
|
private static final Logger logger = LoggerFactory.getLogger(CrawledDocumentParquetRecordFileWriter.class);
|
||||||
|
|
||||||
public static void convertWarc(String domain, Path warcInputFile, Path parquetOutputFile) throws IOException {
|
public static void convertWarc(String domain, Path warcInputFile, Path parquetOutputFile) {
|
||||||
try (var warcReader = new WarcReader(warcInputFile);
|
try (var warcReader = new WarcReader(warcInputFile);
|
||||||
var parquetWriter = new CrawledDocumentParquetRecordFileWriter(parquetOutputFile)
|
var parquetWriter = new CrawledDocumentParquetRecordFileWriter(parquetOutputFile)
|
||||||
) {
|
) {
|
||||||
WarcXResponseReference.register(warcReader);
|
WarcXResponseReference.register(warcReader);
|
||||||
|
WarcXEntityRefused.register(warcReader);
|
||||||
|
|
||||||
for (var record : warcReader) {
|
for (var record : warcReader) {
|
||||||
if (record instanceof WarcResponse response) {
|
if (record instanceof WarcResponse response) {
|
||||||
|
// this also captures WarcXResponseReference, which inherits from WarcResponse
|
||||||
|
// and is used to store old responses from previous crawls; in this part of the logic
|
||||||
|
// we treat them the same as a normal response
|
||||||
|
|
||||||
parquetWriter.write(domain, response);
|
parquetWriter.write(domain, response);
|
||||||
}
|
}
|
||||||
|
else if (record instanceof WarcXEntityRefused refused) {
|
||||||
|
parquetWriter.write(domain, refused);
|
||||||
|
}
|
||||||
else if (record instanceof Warcinfo warcinfo) {
|
else if (record instanceof Warcinfo warcinfo) {
|
||||||
parquetWriter.write(domain, warcinfo);
|
parquetWriter.write(warcinfo);
|
||||||
}
|
}
|
||||||
else {
|
|
||||||
logger.warn("Skipping record of type {}", record.type());
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
|
@ -39,31 +44,40 @@ public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void write(String domain, Warcinfo warcinfo) throws IOException {
|
private void write(String domain, WarcXEntityRefused refused) throws IOException {
|
||||||
|
URI profile = refused.profile();
|
||||||
|
|
||||||
|
String meta;
|
||||||
|
if (profile.equals(WarcXEntityRefused.documentRobotsTxtSkippedURN)) {
|
||||||
|
meta = "x-marginalia/advisory;state=robots-txt-skipped";
|
||||||
|
}
|
||||||
|
else if (profile.equals(WarcXEntityRefused.documentBadContentTypeURN)) {
|
||||||
|
meta = "x-marginalia/advisory;state=content-type-failed-probe";
|
||||||
|
}
|
||||||
|
else if (profile.equals(WarcXEntityRefused.documentProbeTimeout)) {
|
||||||
|
meta = "x-marginalia/advisory;state=timeout-probe";
|
||||||
|
}
|
||||||
|
else if (profile.equals(WarcXEntityRefused.documentUnspecifiedError)) {
|
||||||
|
meta = "x-marginalia/advisory;state=doc-error";
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
meta = "x-marginalia/advisory;state=unknown";
|
||||||
|
}
|
||||||
|
|
||||||
|
write(forDocError(domain, refused.target(), meta));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void write(Warcinfo warcinfo) throws IOException {
|
||||||
String selfDomain = warcinfo.fields().first("domain").orElse("");
|
String selfDomain = warcinfo.fields().first("domain").orElse("");
|
||||||
String ip = warcinfo.fields().first("ip").orElse("");
|
String ip = warcinfo.fields().first("ip").orElse("");
|
||||||
String probeStatus = warcinfo.fields().first("X-WARC-Probe-Status").orElse("");
|
String probeStatus = warcinfo.fields().first("X-WARC-Probe-Status").orElse("");
|
||||||
|
|
||||||
if (probeStatus.startsWith("REDIRECT")) {
|
if (probeStatus.startsWith("REDIRECT")) {
|
||||||
String redirectDomain = probeStatus.substring("REDIRECT;".length());
|
String redirectDomain = probeStatus.substring("REDIRECT;".length());
|
||||||
write(new CrawledDocumentParquetRecord(selfDomain,
|
write(forDomainRedirect(selfDomain, redirectDomain));
|
||||||
STR."https://\{redirectDomain}/",
|
|
||||||
ip,
|
|
||||||
false,
|
|
||||||
0,
|
|
||||||
"x-marginalia/advisory;state=redirect",
|
|
||||||
new byte[0]
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
else if (!"OK".equals(probeStatus)) {
|
else if (!"OK".equals(probeStatus)) {
|
||||||
write(new CrawledDocumentParquetRecord(selfDomain,
|
write(forDomainError(selfDomain, ip, probeStatus));
|
||||||
STR."https://\{domain}/",
|
|
||||||
ip,
|
|
||||||
false,
|
|
||||||
0,
|
|
||||||
"x-marginalia/advisory;state=error",
|
|
||||||
probeStatus.getBytes()
|
|
||||||
));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -83,6 +97,15 @@ public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We don't want to store robots.txt files, as they are not
|
||||||
|
// interesting for the analysis we want to do. This is important
|
||||||
|
// since txt-files in general are interesting, and we don't want to
|
||||||
|
// exclude them as a class.
|
||||||
|
|
||||||
|
if (fetchOk.uri().getPath().equals("/robots.txt")) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
byte[] bodyBytes;
|
byte[] bodyBytes;
|
||||||
String contentType;
|
String contentType;
|
||||||
|
|
||||||
|
@ -112,4 +135,36 @@ public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable {
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private CrawledDocumentParquetRecord forDomainRedirect(String domain, String redirectDomain) {
|
||||||
|
return new CrawledDocumentParquetRecord(domain,
|
||||||
|
STR."https://\{redirectDomain}/",
|
||||||
|
"",
|
||||||
|
false,
|
||||||
|
0,
|
||||||
|
"x-marginalia/advisory;state=redirect",
|
||||||
|
new byte[0]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
private CrawledDocumentParquetRecord forDomainError(String domain, String ip, String errorStatus) {
|
||||||
|
return new CrawledDocumentParquetRecord(domain,
|
||||||
|
STR."https://\{domain}/",
|
||||||
|
ip,
|
||||||
|
false,
|
||||||
|
0,
|
||||||
|
"x-marginalia/advisory;state=error",
|
||||||
|
errorStatus.getBytes()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
private CrawledDocumentParquetRecord forDocError(String domain, String url, String errorStatus) {
|
||||||
|
return new CrawledDocumentParquetRecord(domain,
|
||||||
|
url,
|
||||||
|
"",
|
||||||
|
false,
|
||||||
|
0,
|
||||||
|
"x-marginalia/advisory;state=error",
|
||||||
|
errorStatus.getBytes()
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,45 @@
|
||||||
|
package org.netpreserve.jwarc;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
|
||||||
|
/** This defines a non-standard extension to WARC for storing old HTTP responses,
|
||||||
|
* essentially a 'response' with different semantics
|
||||||
|
*/
|
||||||
|
public class WarcXEntityRefused extends WarcRevisit {
|
||||||
|
private static final String TYPE_NAME = "x-entity-refused";
|
||||||
|
|
||||||
|
public static final URI documentRobotsTxtSkippedURN = URI.create("urn:marginalia/meta/doc/robots-txt-skipped");
|
||||||
|
public static final URI documentBadContentTypeURN = URI.create("urn:marginalia/meta/doc/content-type-failed-probe");
|
||||||
|
public static final URI documentProbeTimeout = URI.create("urn:marginalia/meta/doc/timeout-probe");
|
||||||
|
public static final URI documentUnspecifiedError = URI.create("urn:marginalia/meta/doc/error");
|
||||||
|
|
||||||
|
WarcXEntityRefused(MessageVersion version, MessageHeaders headers, MessageBody body) {
|
||||||
|
super(version, headers, body);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void register(WarcReader reader) {
|
||||||
|
reader.registerType(TYPE_NAME, WarcXEntityRefused::new);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static class Builder extends AbstractBuilder<WarcXEntityRefused, Builder> {
|
||||||
|
public Builder(URI targetURI, URI profile) {
|
||||||
|
this(targetURI.toString(), profile.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder(String targetURI, String profileURI) {
|
||||||
|
super(TYPE_NAME);
|
||||||
|
setHeader("WARC-Target-URI", targetURI);
|
||||||
|
setHeader("WARC-Profile", profileURI);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Builder body(HttpResponse httpResponse) throws IOException {
|
||||||
|
return body(MediaType.HTTP_RESPONSE, httpResponse);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public WarcXEntityRefused build() {
|
||||||
|
return build(WarcXEntityRefused::new);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -4,9 +4,7 @@ import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
|
||||||
/** This defines a non-standard extension to WARC for storing old HTTP responses,
|
/** This defines a non-standard extension to WARC for storing old HTTP responses,
|
||||||
* essentially a 'revisit' with a full body, which is not something that is
|
* essentially a 'response' with different semantics..
|
||||||
* expected by the jwarc parser, and goes against the semantics of the revisit
|
|
||||||
* records a fair bit.
|
|
||||||
* <p>
|
* <p>
|
||||||
* An x-response-reference record is a response record with a full body, where
|
* An x-response-reference record is a response record with a full body, where
|
||||||
* the data is a reconstructed HTTP response from a previous crawl.
|
* the data is a reconstructed HTTP response from a previous crawl.
|
||||||
|
|
|
@ -5,6 +5,8 @@ import com.google.common.hash.Hashing;
|
||||||
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
|
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
|
||||||
import nu.marginalia.crawling.model.CrawledDocument;
|
import nu.marginalia.crawling.model.CrawledDocument;
|
||||||
import nu.marginalia.lsh.EasyLSH;
|
import nu.marginalia.lsh.EasyLSH;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -15,6 +17,7 @@ import java.nio.file.Path;
|
||||||
public class CrawlDataReference implements AutoCloseable {
|
public class CrawlDataReference implements AutoCloseable {
|
||||||
|
|
||||||
private final SerializableCrawlDataStream data;
|
private final SerializableCrawlDataStream data;
|
||||||
|
private static final Logger logger = LoggerFactory.getLogger(CrawlDataReference.class);
|
||||||
|
|
||||||
public CrawlDataReference(SerializableCrawlDataStream data) {
|
public CrawlDataReference(SerializableCrawlDataStream data) {
|
||||||
this.data = data;
|
this.data = data;
|
||||||
|
@ -43,8 +46,9 @@ public class CrawlDataReference implements AutoCloseable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (IOException ex) {
|
catch (IOException ex) {
|
||||||
ex.printStackTrace();
|
logger.error("Failed to read next document", ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,8 +20,18 @@ public class CrawlDelayTimer {
|
||||||
this.delayTime = delayTime;
|
this.delayTime = delayTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Call when we've gotten an HTTP 429 response. This will wait a moment, and then
|
||||||
|
* set a flag that slows down the main crawl delay as well. */
|
||||||
|
public void waitRetryDelay(RateLimitException ex) throws InterruptedException {
|
||||||
|
slowDown = true;
|
||||||
|
|
||||||
|
int delay = ex.retryAfter();
|
||||||
|
|
||||||
|
Thread.sleep(Math.clamp(delay, 100, 5000));
|
||||||
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
public void delay(long spentTime) {
|
public void waitFetchDelay(long spentTime) {
|
||||||
long sleepTime = delayTime;
|
long sleepTime = delayTime;
|
||||||
|
|
||||||
if (sleepTime >= 1) {
|
if (sleepTime >= 1) {
|
||||||
|
@ -30,10 +40,6 @@ public class CrawlDelayTimer {
|
||||||
|
|
||||||
Thread.sleep(min(sleepTime - spentTime, 5000));
|
Thread.sleep(min(sleepTime - spentTime, 5000));
|
||||||
}
|
}
|
||||||
else if (slowDown) {
|
|
||||||
// Additional delay when the server is signalling it wants slower requests
|
|
||||||
Thread.sleep( DEFAULT_CRAWL_DELAY_MIN_MS);
|
|
||||||
}
|
|
||||||
else {
|
else {
|
||||||
// When no crawl delay is specified, lean toward twice the fetch+process time,
|
// When no crawl delay is specified, lean toward twice the fetch+process time,
|
||||||
// within sane limits. This means slower servers get slower crawling, and faster
|
// within sane limits. This means slower servers get slower crawling, and faster
|
||||||
|
@ -48,10 +54,10 @@ public class CrawlDelayTimer {
|
||||||
|
|
||||||
Thread.sleep(sleepTime - spentTime);
|
Thread.sleep(sleepTime - spentTime);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
/** Increase the delay between requests if the server is signalling it wants slower requests with HTTP 429 */
|
if (slowDown) {
|
||||||
public void slowDown() {
|
// Additional delay when the server is signalling it wants slower requests
|
||||||
slowDown = true;
|
Thread.sleep( DEFAULT_CRAWL_DELAY_MIN_MS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,7 +3,6 @@ package nu.marginalia.crawl.retreival;
|
||||||
import com.google.common.hash.HashFunction;
|
import com.google.common.hash.HashFunction;
|
||||||
import com.google.common.hash.Hashing;
|
import com.google.common.hash.Hashing;
|
||||||
import crawlercommons.robots.SimpleRobotRules;
|
import crawlercommons.robots.SimpleRobotRules;
|
||||||
import lombok.SneakyThrows;
|
|
||||||
import nu.marginalia.atags.model.DomainLinks;
|
import nu.marginalia.atags.model.DomainLinks;
|
||||||
import nu.marginalia.contenttype.ContentType;
|
import nu.marginalia.contenttype.ContentType;
|
||||||
import nu.marginalia.crawl.retreival.fetcher.ContentTags;
|
import nu.marginalia.crawl.retreival.fetcher.ContentTags;
|
||||||
|
@ -19,6 +18,7 @@ import nu.marginalia.ip_blocklist.UrlBlocklist;
|
||||||
import nu.marginalia.model.EdgeDomain;
|
import nu.marginalia.model.EdgeDomain;
|
||||||
import nu.marginalia.model.EdgeUrl;
|
import nu.marginalia.model.EdgeUrl;
|
||||||
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
|
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
|
||||||
|
import org.jsoup.Jsoup;
|
||||||
import org.jsoup.nodes.Document;
|
import org.jsoup.nodes.Document;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -32,6 +32,7 @@ import java.util.*;
|
||||||
public class CrawlerRetreiver implements AutoCloseable {
|
public class CrawlerRetreiver implements AutoCloseable {
|
||||||
|
|
||||||
private static final int MAX_ERRORS = 20;
|
private static final int MAX_ERRORS = 20;
|
||||||
|
private static final int HTTP_429_RETRY_LIMIT = 1; // Retry 429s once
|
||||||
|
|
||||||
private final HttpFetcher fetcher;
|
private final HttpFetcher fetcher;
|
||||||
|
|
||||||
|
@ -40,7 +41,6 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||||
private static final LinkParser linkParser = new LinkParser();
|
private static final LinkParser linkParser = new LinkParser();
|
||||||
private static final Logger logger = LoggerFactory.getLogger(CrawlerRetreiver.class);
|
private static final Logger logger = LoggerFactory.getLogger(CrawlerRetreiver.class);
|
||||||
|
|
||||||
private static final HashFunction hashMethod = Hashing.murmur3_128(0);
|
|
||||||
private static final UrlBlocklist urlBlocklist = new UrlBlocklist();
|
private static final UrlBlocklist urlBlocklist = new UrlBlocklist();
|
||||||
private static final LinkFilterSelector linkFilterSelector = new LinkFilterSelector();
|
private static final LinkFilterSelector linkFilterSelector = new LinkFilterSelector();
|
||||||
|
|
||||||
|
@ -104,7 +104,7 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||||
resync.run(warcFile);
|
resync.run(warcFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
private int crawlDomain(CrawlDataReference oldCrawlData, DomainProber.ProbeResult probeResult, DomainLinks domainLinks) throws IOException {
|
private int crawlDomain(CrawlDataReference oldCrawlData, DomainProber.ProbeResult probeResult, DomainLinks domainLinks) throws IOException, InterruptedException {
|
||||||
String ip = findIp(domain);
|
String ip = findIp(domain);
|
||||||
|
|
||||||
EdgeUrl rootUrl;
|
EdgeUrl rootUrl;
|
||||||
|
@ -124,7 +124,7 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||||
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain, warcRecorder);
|
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain, warcRecorder);
|
||||||
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
|
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
|
||||||
|
|
||||||
sniffRootDocument(delayTimer, rootUrl);
|
sniffRootDocument(rootUrl);
|
||||||
|
|
||||||
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
|
// Play back the old crawl data (if present) and fetch the documents comparing etags and last-modified
|
||||||
int recrawled = recrawl(oldCrawlData, robotsRules, delayTimer);
|
int recrawled = recrawl(oldCrawlData, robotsRules, delayTimer);
|
||||||
|
@ -181,10 +181,16 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
|
|
||||||
|
try {
|
||||||
if (fetchWriteAndSleep(top, delayTimer, DocumentWithReference.empty()).isOk()) {
|
if (fetchWriteAndSleep(top, delayTimer, DocumentWithReference.empty()).isOk()) {
|
||||||
fetchedCount++;
|
fetchedCount++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
catch (InterruptedException ex) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
ret.cookies = fetcher.getCookies();
|
ret.cookies = fetcher.getCookies();
|
||||||
|
|
||||||
|
@ -192,17 +198,17 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Using the old crawl data, fetch the documents comparing etags and last-modified */
|
/** Using the old crawl data, fetch the documents comparing etags and last-modified */
|
||||||
private int recrawl(CrawlDataReference oldCrawlData, SimpleRobotRules robotsRules, CrawlDelayTimer delayTimer) {
|
private int recrawl(CrawlDataReference oldCrawlData, SimpleRobotRules robotsRules, CrawlDelayTimer delayTimer) throws InterruptedException {
|
||||||
return crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer);
|
return crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void sniffRootDocument(CrawlDelayTimer delayTimer, EdgeUrl rootUrl) {
|
private void sniffRootDocument(EdgeUrl rootUrl) {
|
||||||
try {
|
try {
|
||||||
logger.debug("Configuring link filter");
|
logger.debug("Configuring link filter");
|
||||||
|
|
||||||
var url = rootUrl.withPathAndParam("/", null);
|
var url = rootUrl.withPathAndParam("/", null);
|
||||||
|
|
||||||
var result = tryDownload(url, delayTimer, ContentTags.empty());
|
var result = fetcher.fetchContent(url, warcRecorder, ContentTags.empty());
|
||||||
if (!(result instanceof HttpFetchResult.ResultOk ok))
|
if (!(result instanceof HttpFetchResult.ResultOk ok))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
@ -240,21 +246,27 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||||
|
|
||||||
public HttpFetchResult fetchWriteAndSleep(EdgeUrl top,
|
public HttpFetchResult fetchWriteAndSleep(EdgeUrl top,
|
||||||
CrawlDelayTimer timer,
|
CrawlDelayTimer timer,
|
||||||
DocumentWithReference reference) {
|
DocumentWithReference reference) throws InterruptedException
|
||||||
|
{
|
||||||
logger.debug("Fetching {}", top);
|
logger.debug("Fetching {}", top);
|
||||||
|
|
||||||
|
HttpFetchResult fetchedDoc = new HttpFetchResult.ResultNone();
|
||||||
|
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
|
|
||||||
var contentTags = reference.getContentTags();
|
var contentTags = reference.getContentTags();
|
||||||
var fetchedDoc = tryDownload(top, timer, contentTags);
|
|
||||||
|
|
||||||
if (fetchedDoc instanceof HttpFetchResult.Result304Raw) {
|
// Fetch the document, retrying if we get a rate limit exception
|
||||||
var doc = reference.doc();
|
for (int i = 0; i <= HTTP_429_RETRY_LIMIT; i++) {
|
||||||
if (doc != null) {
|
try {
|
||||||
warcRecorder.writeReferenceCopy(top, doc.contentType, doc.httpStatus, doc.documentBody);
|
fetchedDoc = fetcher.fetchContent(top, warcRecorder, contentTags);
|
||||||
fetchedDoc = new HttpFetchResult.Result304ReplacedWithReference(doc.url,
|
break;
|
||||||
new ContentType(doc.contentType, "UTF-8"),
|
}
|
||||||
doc.documentBody);
|
catch (RateLimitException ex) {
|
||||||
|
timer.waitRetryDelay(ex);
|
||||||
|
}
|
||||||
|
catch (Exception ex) {
|
||||||
|
logger.warn("Failed to fetch {}", top, ex);
|
||||||
|
fetchedDoc = new HttpFetchResult.ResultException(ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,14 +280,19 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||||
crawlFrontier.addVisited(new EdgeUrl(ok.uri()));
|
crawlFrontier.addVisited(new EdgeUrl(ok.uri()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (fetchedDoc instanceof HttpFetchResult.Result304ReplacedWithReference retained) {
|
else if (fetchedDoc instanceof HttpFetchResult.Result304Raw && reference.doc() != null) {
|
||||||
var docOpt = retained.parseDocument();
|
var doc = reference.doc();
|
||||||
if (docOpt.isPresent()) {
|
|
||||||
var doc = docOpt.get();
|
|
||||||
|
|
||||||
crawlFrontier.enqueueLinksFromDocument(top, doc);
|
warcRecorder.writeReferenceCopy(top, doc.contentType, doc.httpStatus, doc.documentBody);
|
||||||
EdgeUrl.parse(retained.url()).ifPresent(crawlFrontier::addVisited);
|
|
||||||
}
|
fetchedDoc = new HttpFetchResult.Result304ReplacedWithReference(doc.url,
|
||||||
|
new ContentType(doc.contentType, "UTF-8"),
|
||||||
|
doc.documentBody);
|
||||||
|
|
||||||
|
var parsed = Jsoup.parse(doc.documentBody);
|
||||||
|
|
||||||
|
crawlFrontier.enqueueLinksFromDocument(top, parsed);
|
||||||
|
crawlFrontier.addVisited(top);
|
||||||
}
|
}
|
||||||
else if (fetchedDoc instanceof HttpFetchResult.ResultException ex) {
|
else if (fetchedDoc instanceof HttpFetchResult.ResultException ex) {
|
||||||
errorCount ++;
|
errorCount ++;
|
||||||
|
@ -285,7 +302,7 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||||
logger.error("Error parsing document {}", top, ex);
|
logger.error("Error parsing document {}", top, ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
timer.delay(System.currentTimeMillis() - startTime);
|
timer.waitFetchDelay(System.currentTimeMillis() - startTime);
|
||||||
|
|
||||||
return fetchedDoc;
|
return fetchedDoc;
|
||||||
}
|
}
|
||||||
|
@ -295,33 +312,6 @@ public class CrawlerRetreiver implements AutoCloseable {
|
||||||
|| proto.equalsIgnoreCase("https");
|
|| proto.equalsIgnoreCase("https");
|
||||||
}
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
|
||||||
private HttpFetchResult tryDownload(EdgeUrl top, CrawlDelayTimer timer, ContentTags tags) {
|
|
||||||
for (int i = 0; i < 2; i++) {
|
|
||||||
try {
|
|
||||||
return fetcher.fetchContent(top, warcRecorder, tags);
|
|
||||||
}
|
|
||||||
catch (RateLimitException ex) {
|
|
||||||
timer.slowDown();
|
|
||||||
|
|
||||||
int delay = ex.retryAfter();
|
|
||||||
if (delay > 0 && delay < 5000) {
|
|
||||||
Thread.sleep(delay);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Exception ex) {
|
|
||||||
logger.warn("Failed to fetch {}", top, ex);
|
|
||||||
return new HttpFetchResult.ResultException(ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return new HttpFetchResult.ResultNone();
|
|
||||||
}
|
|
||||||
|
|
||||||
private String createHash(String documentBodyHash) {
|
|
||||||
return hashMethod.hashUnencodedChars(documentBodyHash).toString();
|
|
||||||
}
|
|
||||||
|
|
||||||
// FIXME this does not belong in the crawler
|
// FIXME this does not belong in the crawler
|
||||||
private Optional<EdgeUrl> findCanonicalUrl(EdgeUrl baseUrl, Document parsed) {
|
private Optional<EdgeUrl> findCanonicalUrl(EdgeUrl baseUrl, Document parsed) {
|
||||||
baseUrl = baseUrl.domain.toRootUrl();
|
baseUrl = baseUrl.domain.toRootUrl();
|
||||||
|
|
|
@ -34,6 +34,7 @@ public class CrawlerWarcResynchronizer {
|
||||||
// First pass, enqueue links
|
// First pass, enqueue links
|
||||||
try (var reader = new WarcReader(tempFile)) {
|
try (var reader = new WarcReader(tempFile)) {
|
||||||
WarcXResponseReference.register(reader);
|
WarcXResponseReference.register(reader);
|
||||||
|
WarcXEntityRefused.register(reader);
|
||||||
|
|
||||||
for (var item : reader) {
|
for (var item : reader) {
|
||||||
accept(item);
|
accept(item);
|
||||||
|
@ -58,13 +59,26 @@ public class CrawlerWarcResynchronizer {
|
||||||
response(rsp);
|
response(rsp);
|
||||||
} else if (item instanceof WarcRequest req) {
|
} else if (item instanceof WarcRequest req) {
|
||||||
request(req);
|
request(req);
|
||||||
|
} else if (item instanceof WarcXEntityRefused refused) {
|
||||||
|
refused(refused);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
catch (Exception ex) {
|
catch (Exception ex) {
|
||||||
logger.info(STR."Failed to process warc record \{item}", ex);
|
logger.info(STR."Failed to process warc record \{item}", ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void refused(WarcXEntityRefused refused) {
|
||||||
|
// In general, we don't want to re-crawl urls that were refused,
|
||||||
|
// but to permit circumstances to change over time, we'll
|
||||||
|
// allow for a small chance of re-probing these entries
|
||||||
|
|
||||||
|
if (Math.random() > 0.1) {
|
||||||
|
crawlFrontier.addVisited(new EdgeUrl(refused.targetURI()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void request(WarcRequest request) {
|
private void request(WarcRequest request) {
|
||||||
EdgeUrl.parse(request.target()).ifPresent(crawlFrontier::addVisited);
|
EdgeUrl.parse(request.target()).ifPresent(crawlFrontier::addVisited);
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,9 +50,14 @@ public class DomainCrawlFrontier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Increase the depth of the crawl by a factor. If the current depth is smaller
|
||||||
|
* than the number of already visited documents, the base depth will be adjusted
|
||||||
|
* to the visited count first.
|
||||||
|
*/
|
||||||
public void increaseDepth(double depthIncreaseFactor) {
|
public void increaseDepth(double depthIncreaseFactor) {
|
||||||
depth = (int)(depth * depthIncreaseFactor);
|
depth = (int)(Math.max(visited.size(), depth) * depthIncreaseFactor);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setLinkFilter(Predicate<EdgeUrl> linkFilter) {
|
public void setLinkFilter(Predicate<EdgeUrl> linkFilter) {
|
||||||
this.linkFilter = linkFilter;
|
this.linkFilter = linkFilter;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,10 @@ public class WarcProtocolReconstructor {
|
||||||
|
|
||||||
static String getHttpRequestString(Request request, URI uri) {
|
static String getHttpRequestString(Request request, URI uri) {
|
||||||
StringBuilder requestStringBuilder = new StringBuilder();
|
StringBuilder requestStringBuilder = new StringBuilder();
|
||||||
requestStringBuilder.append(request.method()).append(" ").append(URLEncoder.encode(uri.getPath(), StandardCharsets.UTF_8));
|
|
||||||
|
final String encodedURL = encodeURLKeepSlashes(uri.getPath());
|
||||||
|
|
||||||
|
requestStringBuilder.append(request.method()).append(" ").append(encodedURL);
|
||||||
|
|
||||||
if (uri.getQuery() != null) {
|
if (uri.getQuery() != null) {
|
||||||
requestStringBuilder.append("?").append(uri.getQuery());
|
requestStringBuilder.append("?").append(uri.getQuery());
|
||||||
|
@ -37,6 +40,19 @@ public class WarcProtocolReconstructor {
|
||||||
return requestStringBuilder.toString();
|
return requestStringBuilder.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Java's URLEncoder will URLEncode slashes, which is not desirable
|
||||||
|
* when sanitizing a URL for HTTP protocol purposes
|
||||||
|
*/
|
||||||
|
|
||||||
|
private static String encodeURLKeepSlashes(String URL) {
|
||||||
|
String[] parts = StringUtils.split(URL,"/");
|
||||||
|
StringJoiner joiner = new StringJoiner("/");
|
||||||
|
for (String part : parts) {
|
||||||
|
joiner.add(URLEncoder.encode(part, StandardCharsets.UTF_8));
|
||||||
|
}
|
||||||
|
return joiner.toString();
|
||||||
|
}
|
||||||
|
|
||||||
static String getResponseHeader(String headersAsString, int code) {
|
static String getResponseHeader(String headersAsString, int code) {
|
||||||
String version = "1.1";
|
String version = "1.1";
|
||||||
|
|
||||||
|
@ -131,6 +147,11 @@ public class WarcProtocolReconstructor {
|
||||||
if (headerCapitalized.startsWith("X-Marginalia"))
|
if (headerCapitalized.startsWith("X-Marginalia"))
|
||||||
return;
|
return;
|
||||||
|
|
||||||
|
// Omit Transfer-Encoding header, as we'll be using Content-Length
|
||||||
|
// instead in the warc file, despite what the server says
|
||||||
|
if (headerCapitalized.startsWith("Transfer-Encoding"))
|
||||||
|
return;
|
||||||
|
|
||||||
for (var value : values) {
|
for (var value : values) {
|
||||||
joiner.add(headerCapitalized + ": " + value);
|
joiner.add(headerCapitalized + ": " + value);
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,11 +29,6 @@ import java.util.*;
|
||||||
* be reconstructed.
|
* be reconstructed.
|
||||||
*/
|
*/
|
||||||
public class WarcRecorder implements AutoCloseable {
|
public class WarcRecorder implements AutoCloseable {
|
||||||
public static final URI documentRobotsTxtSkippedURN = URI.create("urn:marginalia/meta/doc/robots-txt-skipped");
|
|
||||||
public static final URI documentBadContentTypeURN = URI.create("urn:marginalia/meta/doc/content-type-failed-probe");
|
|
||||||
public static final URI documentProbeTimeout = URI.create("urn:marginalia/meta/doc/timeout-probe");
|
|
||||||
public static final URI documentUnspecifiedError = URI.create("urn:marginalia/meta/doc/error");
|
|
||||||
|
|
||||||
private static final int MAX_TIME = 30_000;
|
private static final int MAX_TIME = 30_000;
|
||||||
private static final int MAX_SIZE = 1024 * 1024 * 10;
|
private static final int MAX_SIZE = 1024 * 1024 * 10;
|
||||||
private final WarcWriter writer;
|
private final WarcWriter writer;
|
||||||
|
@ -91,6 +86,8 @@ public class WarcRecorder implements AutoCloseable {
|
||||||
|
|
||||||
ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer();
|
ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer();
|
||||||
|
|
||||||
|
boolean hasCookies = !client.cookieJar().loadForRequest(request.url()).isEmpty();
|
||||||
|
|
||||||
try (var response = call.execute()) {
|
try (var response = call.execute()) {
|
||||||
var body = response.body();
|
var body = response.body();
|
||||||
InputStream inputStream;
|
InputStream inputStream;
|
||||||
|
@ -143,6 +140,7 @@ public class WarcRecorder implements AutoCloseable {
|
||||||
|
|
||||||
WarcResponse.Builder responseBuilder = new WarcResponse.Builder(responseUri)
|
WarcResponse.Builder responseBuilder = new WarcResponse.Builder(responseUri)
|
||||||
.blockDigest(responseDigestBuilder.build())
|
.blockDigest(responseDigestBuilder.build())
|
||||||
|
.addHeader("X-Has-Cookies", hasCookies ? "1" : "0")
|
||||||
.date(date)
|
.date(date)
|
||||||
.body(MediaType.HTTP_RESPONSE, responseDataBuffer.copyBytes());
|
.body(MediaType.HTTP_RESPONSE, responseDataBuffer.copyBytes());
|
||||||
|
|
||||||
|
@ -280,11 +278,11 @@ public class WarcRecorder implements AutoCloseable {
|
||||||
|
|
||||||
public void flagAsRobotsTxtError(EdgeUrl top) {
|
public void flagAsRobotsTxtError(EdgeUrl top) {
|
||||||
try {
|
try {
|
||||||
WarcRevisit revisit = new WarcRevisit.Builder(top.asURI(), documentRobotsTxtSkippedURN)
|
WarcXEntityRefused refusal = new WarcXEntityRefused.Builder(top.asURI(), WarcXEntityRefused.documentRobotsTxtSkippedURN)
|
||||||
.date(Instant.now())
|
.date(Instant.now())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
writer.write(revisit);
|
writer.write(refusal);
|
||||||
} catch (URISyntaxException | IOException e) {
|
} catch (URISyntaxException | IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -292,13 +290,13 @@ public class WarcRecorder implements AutoCloseable {
|
||||||
|
|
||||||
public void flagAsFailedContentTypeProbe(EdgeUrl url, String contentType, int status) {
|
public void flagAsFailedContentTypeProbe(EdgeUrl url, String contentType, int status) {
|
||||||
try {
|
try {
|
||||||
WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), documentBadContentTypeURN)
|
WarcXEntityRefused refusal = new WarcXEntityRefused.Builder(url.asURI(), WarcXEntityRefused.documentBadContentTypeURN)
|
||||||
.date(Instant.now())
|
.date(Instant.now())
|
||||||
.addHeader("Rejected-Content-Type", contentType)
|
.addHeader("Rejected-Content-Type", contentType)
|
||||||
.addHeader("Http-Status", Integer.toString(status))
|
.addHeader("Http-Status", Integer.toString(status))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
writer.write(revisit);
|
writer.write(refusal);
|
||||||
} catch (URISyntaxException | IOException e) {
|
} catch (URISyntaxException | IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -306,13 +304,13 @@ public class WarcRecorder implements AutoCloseable {
|
||||||
|
|
||||||
public void flagAsError(EdgeUrl url, Exception ex) {
|
public void flagAsError(EdgeUrl url, Exception ex) {
|
||||||
try {
|
try {
|
||||||
WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), documentUnspecifiedError)
|
WarcXEntityRefused refusal = new WarcXEntityRefused.Builder(url.asURI(), WarcXEntityRefused.documentUnspecifiedError)
|
||||||
.date(Instant.now())
|
.date(Instant.now())
|
||||||
.addHeader("Exception", ex.getClass().getSimpleName())
|
.addHeader("Exception", ex.getClass().getSimpleName())
|
||||||
.addHeader("ErrorMessage", Objects.requireNonNullElse(ex.getMessage(), ""))
|
.addHeader("ErrorMessage", Objects.requireNonNullElse(ex.getMessage(), ""))
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
writer.write(revisit);
|
writer.write(refusal);
|
||||||
} catch (URISyntaxException | IOException e) {
|
} catch (URISyntaxException | IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
@ -320,11 +318,11 @@ public class WarcRecorder implements AutoCloseable {
|
||||||
|
|
||||||
public void flagAsTimeout(EdgeUrl url) {
|
public void flagAsTimeout(EdgeUrl url) {
|
||||||
try {
|
try {
|
||||||
WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), documentProbeTimeout)
|
WarcXEntityRefused refusal = new WarcXEntityRefused.Builder(url.asURI(), WarcXEntityRefused.documentProbeTimeout)
|
||||||
.date(Instant.now())
|
.date(Instant.now())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
writer.write(revisit);
|
writer.write(refusal);
|
||||||
} catch (URISyntaxException | IOException e) {
|
} catch (URISyntaxException | IOException e) {
|
||||||
throw new RuntimeException(e);
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,13 +15,6 @@ import org.jsoup.Jsoup;
|
||||||
* E-Tag and Last-Modified headers.
|
* E-Tag and Last-Modified headers.
|
||||||
*/
|
*/
|
||||||
public class CrawlerRevisitor {
|
public class CrawlerRevisitor {
|
||||||
/** recrawlState tag for documents that had a HTTP status 304 */
|
|
||||||
public static final String documentWasRetainedTag = "RETAINED/304";
|
|
||||||
|
|
||||||
/** recrawlState tag for documents that had a 200 status but were identical to a previous version */
|
|
||||||
public static final String documentWasSameTag = "SAME-BY-COMPARISON";
|
|
||||||
|
|
||||||
|
|
||||||
private final DomainCrawlFrontier crawlFrontier;
|
private final DomainCrawlFrontier crawlFrontier;
|
||||||
private final CrawlerRetreiver crawlerRetreiver;
|
private final CrawlerRetreiver crawlerRetreiver;
|
||||||
private final WarcRecorder warcRecorder;
|
private final WarcRecorder warcRecorder;
|
||||||
|
@ -37,7 +30,8 @@ public class CrawlerRevisitor {
|
||||||
/** Performs a re-crawl of old documents, comparing etags and last-modified */
|
/** Performs a re-crawl of old documents, comparing etags and last-modified */
|
||||||
public int recrawl(CrawlDataReference oldCrawlData,
|
public int recrawl(CrawlDataReference oldCrawlData,
|
||||||
SimpleRobotRules robotsRules,
|
SimpleRobotRules robotsRules,
|
||||||
CrawlDelayTimer delayTimer) {
|
CrawlDelayTimer delayTimer)
|
||||||
|
throws InterruptedException {
|
||||||
int recrawled = 0;
|
int recrawled = 0;
|
||||||
int retained = 0;
|
int retained = 0;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue