(crawler) WIP integration of WARC files into the crawler process.

At this stage, the crawler will use the WARCs to resume a crawl if it terminates incorrectly.

This is a WIP commit, since the warc files are not fully incorporated into the work flow, they are deleted after the domain is crawled.

The commit also includes fairly invasive refactoring of the crawler classes, to accomplish better separation of concerns.
This commit is contained in:
Viktor Lofgren 2023-12-11 19:32:58 +01:00
parent 45987a1d98
commit b74a3ebd85
22 changed files with 858 additions and 310 deletions

View File

@ -224,19 +224,19 @@ public class EdgeUrl implements Serializable {
}
public URL asURL() throws MalformedURLException {
int port = this.port != null ? this.port : switch(proto) {
case "http" -> 80;
case "https" -> 443;
default -> 0;
};
return new URL(this.proto, this.domain.toString(), port, this.path);
try {
return asURI().toURL();
}
catch (URISyntaxException e) {
throw new MalformedURLException(e.getMessage());
}
}
public URI asURI() throws URISyntaxException {
if (port == null)
return new URI(this.proto, null, this.domain.toString(), this.path, this.param);
else
if (port != null) {
return new URI(this.proto, null, this.domain.toString(), this.port, this.path, this.param, null);
}
return new URI(this.proto, this.domain.toString(), this.path, this.param, null);
}
}

View File

@ -14,7 +14,7 @@ public class CrawlerOutputFile {
String second = id.substring(2, 4);
Path destDir = base.resolve(first).resolve(second);
return destDir.resolve(id + "-" + filesystemSafeName(name) + ".zstd");
return destDir.resolve(STR."\{id}-\{filesystemSafeName(name)}.zstd");
}
/** Return the Path to a file for the given id and name, creating the prerequisite
@ -31,7 +31,7 @@ public class CrawlerOutputFile {
if (!Files.exists(destDir)) {
Files.createDirectories(destDir);
}
return destDir.resolve(id + "-" + filesystemSafeName(name) + ".zstd");
return destDir.resolve(STR."\{id}-\{filesystemSafeName(name)}.zstd");
}
@ -49,4 +49,25 @@ public class CrawlerOutputFile {
}
public static Path createWarcFile(Path baseDir, String id, String name, WarcFileVersion version) {
if (id.length() < 4) {
id = Strings.repeat("0", 4 - id.length()) + id;
}
String fileName = STR."\{id}-\{filesystemSafeName(name)}.zstd\{version.suffix}";
return baseDir.resolve(fileName);
}
public enum WarcFileVersion {
LIVE(".open"),
TEMP(".tmp"),
FINAL("");
public final String suffix;
WarcFileVersion(String suffix) {
this.suffix = suffix;
}
}
}

View File

@ -24,6 +24,10 @@ public class CrawledDomain implements SerializableCrawlData {
return doc.size();
}
public boolean hasCookies() {
return cookies != null && !cookies.isEmpty();
}
public static final String SERIAL_IDENTIFIER = "// DOMAIN";
@Override
public String getSerialIdentifier() {

View File

@ -79,7 +79,7 @@ public class DomainProcessor {
ret.domain = new EdgeDomain(crawledDomain.domain);
ret.ip = crawledDomain.ip;
cookies = Objects.requireNonNullElse(crawledDomain.cookies, Collections.emptyList()).size() > 0;
cookies = crawledDomain.hasCookies();
ip = crawledDomain.ip;
if (crawledDomain.redirectDomain != null) {

View File

@ -18,6 +18,7 @@ import nu.marginalia.crawl.spec.CrawlSpecProvider;
import nu.marginalia.crawl.spec.DbCrawlSpecProvider;
import nu.marginalia.crawl.spec.ParquetCrawlSpecProvider;
import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawling.io.CrawlerOutputFile;
import nu.marginalia.crawlspec.CrawlSpecFileNames;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
@ -30,16 +31,16 @@ import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.crawling.io.CrawledDomainWriter;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.util.SimpleBlockingThreadPool;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.internal.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.sql.SQLException;
import java.util.*;
import java.util.concurrent.*;
@ -212,8 +213,19 @@ public class CrawlerMain {
@Override
public void run() throws Exception {
Path newWarcFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
Path tempFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
Path finalWarcFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL);
if (Files.exists(newWarcFile)) {
Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING);
}
else {
Files.deleteIfExists(tempFile);
}
try (CrawledDomainWriter writer = new CrawledDomainWriter(outputDir, domain, id);
var warcRecorder = new WarcRecorder(); // write to a temp file for now
var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now
var retreiver = new CrawlerRetreiver(fetcher, domainProber, specification, warcRecorder, writer::accept);
CrawlDataReference reference = getReference())
{
@ -221,19 +233,33 @@ public class CrawlerMain {
var domainLinks = anchorTagsSource.getAnchorTags(domain);
if (Files.exists(tempFile)) {
retreiver.syncAbortedRun(tempFile);
Files.delete(tempFile);
}
int size = retreiver.fetch(domainLinks, reference);
Files.move(newWarcFile, finalWarcFile, StandardCopyOption.REPLACE_EXISTING);
workLog.setJobToFinished(domain, writer.getOutputFile().toString(), size);
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
logger.info("Fetched {}", domain);
} catch (Exception e) {
logger.error("Error fetching domain " + domain, e);
Files.deleteIfExists(newWarcFile);
if (tempFile != null) {
Files.deleteIfExists(tempFile);
}
}
finally {
// We don't need to double-count these; it's also kept int he workLog
processingIds.remove(domain);
Thread.currentThread().setName("[idle]");
// FIXME: Remove this when we're done
Files.deleteIfExists(finalWarcFile);
}
}

View File

@ -1,4 +1,4 @@
package nu.marginalia.crawl.retreival.fetcher;
package nu.marginalia.crawl.retreival;
import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult;
import nu.marginalia.crawling.model.CrawledDocument;
@ -70,6 +70,22 @@ public class CrawledDocumentFactory {
.httpStatus(rsp.statusCode())
.url(url.toString())
.build();
}
public static CrawledDocument createRobotsError(EdgeUrl url) {
return CrawledDocument.builder()
.url(url.toString())
.timestamp(LocalDateTime.now().toString())
.httpStatus(-1)
.crawlerStatus(CrawlerDocumentStatus.ROBOTS_TXT.name())
.build();
}
public static CrawledDocument createRetryError(EdgeUrl url) {
return CrawledDocument.builder()
.url(url.toString())
.timestamp(LocalDateTime.now().toString())
.httpStatus(429)
.crawlerStatus(CrawlerDocumentStatus.ERROR.name())
.build();
}
}

View File

@ -9,6 +9,9 @@ import nu.marginalia.crawl.retreival.fetcher.ContentTags;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.crawl.retreival.fetcher.SitemapRetriever;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.retreival.revisit.CrawlerRevisitor;
import nu.marginalia.crawl.retreival.revisit.DocumentWithReference;
import nu.marginalia.crawl.retreival.sitemap.SitemapFetcher;
import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.crawling.model.*;
import nu.marginalia.ip_blocklist.UrlBlocklist;
@ -20,12 +23,9 @@ import org.jsoup.nodes.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.util.*;
import java.util.function.Consumer;
@ -46,18 +46,13 @@ public class CrawlerRetreiver implements AutoCloseable {
private static final LinkFilterSelector linkFilterSelector = new LinkFilterSelector();
private final DomainProber domainProber;
private final SitemapRetriever sitemapRetriever;
private final DomainCrawlFrontier crawlFrontier;
private final WarcRecorder warcRecorder;
private final CrawlerRevisitor crawlerRevisitor;
private final SitemapFetcher sitemapFetcher;
int errorCount = 0;
/** recrawlState tag for documents that had a HTTP status 304 */
private static final String documentWasRetainedTag = "RETAINED/304";
/** recrawlState tag for documents that had a 200 status but were identical to a previous version */
private static final String documentWasSameTag = "SAME-BY-COMPARISON";
public CrawlerRetreiver(HttpFetcher fetcher,
DomainProber domainProber,
CrawlSpecRecord specs,
@ -72,8 +67,10 @@ public class CrawlerRetreiver implements AutoCloseable {
crawledDomainWriter = writer;
this.crawlFrontier = new DomainCrawlFrontier(new EdgeDomain(domain), Objects.requireNonNullElse(specs.urls, List.of()), specs.crawlDepth);
sitemapRetriever = fetcher.createSitemapRetriever();
crawlFrontier = new DomainCrawlFrontier(new EdgeDomain(domain), Objects.requireNonNullElse(specs.urls, List.of()), specs.crawlDepth);
crawlerRevisitor = new CrawlerRevisitor(crawlFrontier, crawledDomainWriter, this, warcRecorder);
sitemapFetcher = new SitemapFetcher(crawlFrontier, fetcher.createSitemapRetriever());
// We must always crawl the index page first, this is assumed when fingerprinting the server
var fst = crawlFrontier.peek();
@ -125,6 +122,12 @@ public class CrawlerRetreiver implements AutoCloseable {
};
}
public void syncAbortedRun(Path warcFile) {
var resync = new CrawlerWarcResynchronizer(crawlFrontier, warcRecorder);
resync.run(warcFile);
}
private int crawlDomain(CrawlDataReference oldCrawlData, EdgeUrl rootUrl, DomainLinks domainLinks) {
String ip = findIp(domain);
@ -147,9 +150,15 @@ public class CrawlerRetreiver implements AutoCloseable {
crawlFrontier.addAllToQueue(domainLinks.getUrls(rootUrl.proto));
// Add links from the sitemap to the crawl frontier
downloadSitemaps(robotsRules, rootUrl);
sitemapFetcher.downloadSitemaps(robotsRules, rootUrl);
CrawledDomain ret = new CrawledDomain(domain, null, CrawlerDomainStatus.OK.name(), null, ip, new ArrayList<>(), null);
CrawledDomain ret = new CrawledDomain(domain,
null,
CrawlerDomainStatus.OK.name(),
null,
ip,
new ArrayList<>(),
null);
int fetchedCount = recrawled;
@ -161,7 +170,7 @@ public class CrawlerRetreiver implements AutoCloseable {
var top = crawlFrontier.takeNextUrl();
if (!robotsRules.isAllowed(top.toString())) {
crawledDomainWriter.accept(createRobotsError(top));
crawledDomainWriter.accept(CrawledDocumentFactory.createRobotsError(top));
continue;
}
@ -196,119 +205,9 @@ public class CrawlerRetreiver implements AutoCloseable {
return fetchedCount;
}
/** Performs a re-crawl of old documents, comparing etags and last-modified */
private int recrawl(CrawlDataReference oldCrawlData,
SimpleRobotRules robotsRules,
CrawlDelayTimer delayTimer) {
int recrawled = 0;
int retained = 0;
for (;;) {
CrawledDocument doc = oldCrawlData.nextDocument();
if (doc == null) {
break;
}
// This Shouldn't Happen (TM)
var urlMaybe = EdgeUrl.parse(doc.url);
if (urlMaybe.isEmpty()) continue;
var url = urlMaybe.get();
// If we've previously 404:d on this URL, we'll refrain from trying to fetch it again
if (doc.httpStatus == 404) {
crawlFrontier.addVisited(url);
continue;
}
if (doc.httpStatus != 200) continue;
if (!robotsRules.isAllowed(url.toString())) {
crawledDomainWriter.accept(createRobotsError(url));
continue;
}
if (!crawlFrontier.filterLink(url))
continue;
if (!crawlFrontier.addVisited(url))
continue;
if (recrawled > 5
&& retained > 0.9 * recrawled
&& Math.random() < 0.9)
{
// Since it looks like most of these documents haven't changed,
// we'll load the documents directly; but we do this in a random
// fashion to make sure we eventually catch changes over time
crawledDomainWriter.accept(doc);
crawlFrontier.addVisited(url);
continue;
}
// GET the document with the stored document as a reference
// providing etag and last-modified headers, so we can recycle the
// document if it hasn't changed without actually downloading it
var fetchedDocOpt = fetchWriteAndSleep(url,
delayTimer,
new DocumentWithReference(doc, oldCrawlData));
if (fetchedDocOpt.isEmpty()) continue;
if (documentWasRetainedTag.equals(fetchedDocOpt.get().recrawlState)) retained ++;
else if (documentWasSameTag.equals(fetchedDocOpt.get().recrawlState)) retained ++;
recrawled ++;
}
return recrawled;
}
private void downloadSitemaps(SimpleRobotRules robotsRules, EdgeUrl rootUrl) {
List<String> sitemaps = robotsRules.getSitemaps();
List<EdgeUrl> urls = new ArrayList<>(sitemaps.size());
if (!sitemaps.isEmpty()) {
for (var url : sitemaps) {
EdgeUrl.parse(url).ifPresent(urls::add);
}
}
else {
urls.add(rootUrl.withPathAndParam("/sitemap.xml", null));
}
downloadSitemaps(urls);
}
private void downloadSitemaps(List<EdgeUrl> urls) {
Set<String> checkedSitemaps = new HashSet<>();
for (var url : urls) {
// Let's not download sitemaps from other domains for now
if (!crawlFrontier.isSameDomain(url)) {
continue;
}
if (checkedSitemaps.contains(url.path))
continue;
var sitemap = sitemapRetriever.fetchSitemap(url);
if (sitemap.isEmpty()) {
continue;
}
// ensure we don't try to download this sitemap again
// (don't move this up, as we may want to check the same
// path with different protocols until we find one that works)
checkedSitemaps.add(url.path);
crawlFrontier.addAllToQueue(sitemap);
}
logger.debug("Queue is now {}", crawlFrontier.queueSize());
/** Using the old crawl data, fetch the documents comparing etags and last-modified */
private int recrawl(CrawlDataReference oldCrawlData, SimpleRobotRules robotsRules, CrawlDelayTimer delayTimer) {
return crawlerRevisitor.recrawl(oldCrawlData, robotsRules, delayTimer);
}
private void sniffRootDocument(CrawlDelayTimer delayTimer, EdgeUrl rootUrl) {
@ -345,7 +244,7 @@ public class CrawlerRetreiver implements AutoCloseable {
linkParser.parseLink(url, href)
.filter(crawlFrontier::isSameDomain)
.map(List::of)
.ifPresent(this::downloadSitemaps);
.ifPresent(sitemapFetcher::downloadSitemaps);
}
}
catch (Exception ex) {
@ -353,7 +252,7 @@ public class CrawlerRetreiver implements AutoCloseable {
}
}
private Optional<CrawledDocument> fetchWriteAndSleep(EdgeUrl top,
public Optional<CrawledDocument> fetchWriteAndSleep(EdgeUrl top,
CrawlDelayTimer timer,
DocumentWithReference reference) {
logger.debug("Fetching {}", top);
@ -365,11 +264,11 @@ public class CrawlerRetreiver implements AutoCloseable {
if (docOpt.isPresent()) {
var doc = docOpt.get();
if (!Objects.equals(doc.recrawlState, documentWasRetainedTag)
if (!Objects.equals(doc.recrawlState, CrawlerRevisitor.documentWasRetainedTag)
&& reference.isContentBodySame(doc))
{
// The document didn't change since the last time
doc.recrawlState = documentWasSameTag;
doc.recrawlState = CrawlerRevisitor.documentWasSameTag;
}
crawledDomainWriter.accept(doc);
@ -408,7 +307,7 @@ public class CrawlerRetreiver implements AutoCloseable {
var parsedDoc = Jsoup.parse(doc.documentBody);
EdgeUrl url = new EdgeUrl(doc.url);
findLinks(url, parsedDoc);
crawlFrontier.enqueueLinksFromDocument(url, parsedDoc);
findCanonicalUrl(url, parsedDoc)
.ifPresent(canonicalLink -> doc.canonicalUrl = canonicalLink.toString());
}
@ -442,34 +341,13 @@ public class CrawlerRetreiver implements AutoCloseable {
}
}
return createRetryError(top);
return CrawledDocumentFactory.createRetryError(top);
}
private String createHash(String documentBodyHash) {
return hashMethod.hashUnencodedChars(documentBodyHash).toString();
}
private void findLinks(EdgeUrl baseUrl, Document parsed) {
baseUrl = linkParser.getBaseLink(parsed, baseUrl);
for (var link : parsed.getElementsByTag("a")) {
linkParser.parseLink(baseUrl, link).ifPresent(crawlFrontier::addToQueue);
}
for (var link : parsed.getElementsByTag("frame")) {
linkParser.parseFrame(baseUrl, link).ifPresent(crawlFrontier::addToQueue);
}
for (var link : parsed.getElementsByTag("iframe")) {
linkParser.parseFrame(baseUrl, link).ifPresent(crawlFrontier::addToQueue);
}
for (var link : parsed.getElementsByTag("link")) {
String rel = link.attr("rel");
if (rel.equalsIgnoreCase("next") || rel.equalsIgnoreCase("prev")) {
linkParser.parseLink(baseUrl, link).ifPresent(crawlFrontier::addToQueue);
}
}
}
private Optional<EdgeUrl> findCanonicalUrl(EdgeUrl baseUrl, Document parsed) {
baseUrl = baseUrl.domain.toRootUrl();
@ -488,97 +366,9 @@ public class CrawlerRetreiver implements AutoCloseable {
}
}
private CrawledDocument createRobotsError(EdgeUrl url) {
return CrawledDocument.builder()
.url(url.toString())
.timestamp(LocalDateTime.now().toString())
.httpStatus(-1)
.crawlerStatus(CrawlerDocumentStatus.ROBOTS_TXT.name())
.build();
}
private CrawledDocument createRetryError(EdgeUrl url) {
return CrawledDocument.builder()
.url(url.toString())
.timestamp(LocalDateTime.now().toString())
.httpStatus(429)
.crawlerStatus(CrawlerDocumentStatus.ERROR.name())
.build();
}
@Override
public void close() throws Exception {
warcRecorder.close();
}
private record DocumentWithReference(
@Nullable CrawledDocument doc,
@Nullable CrawlDataReference reference) {
private static final DocumentWithReference emptyInstance = new DocumentWithReference(null, null);
public static DocumentWithReference empty() {
return emptyInstance;
}
public boolean isContentBodySame(CrawledDocument newDoc) {
if (reference == null)
return false;
if (doc == null)
return false;
if (doc.documentBody == null)
return false;
if (newDoc.documentBody == null)
return false;
return reference.isContentBodySame(doc, newDoc);
}
private ContentTags getContentTags() {
if (null == doc)
return ContentTags.empty();
String headers = doc.headers;
if (headers == null)
return ContentTags.empty();
String[] headersLines = headers.split("\n");
String lastmod = null;
String etag = null;
for (String line : headersLines) {
if (line.toLowerCase().startsWith("etag:")) {
etag = line.substring(5).trim();
}
if (line.toLowerCase().startsWith("last-modified:")) {
lastmod = line.substring(14).trim();
}
}
return new ContentTags(etag, lastmod);
}
public boolean isEmpty() {
return doc == null || reference == null;
}
/** If the provided document has HTTP status 304, and the reference document is provided,
* return the reference document; otherwise return the provided document.
*/
public CrawledDocument replaceOn304(CrawledDocument fetchedDoc) {
if (doc == null)
return fetchedDoc;
// HTTP status 304 is NOT MODIFIED, which means the document is the same as it was when
// we fetched it last time. We can recycle the reference document.
if (fetchedDoc.httpStatus != 304)
return fetchedDoc;
var ret = doc;
ret.recrawlState = documentWasRetainedTag;
ret.timestamp = LocalDateTime.now().toString();
return ret;
}
}
}

View File

@ -0,0 +1,110 @@
package nu.marginalia.crawl.retreival;
import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyExtractor;
import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyResult;
import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.model.EdgeUrl;
import org.jsoup.Jsoup;
import org.netpreserve.jwarc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
/**
* This class is responsible for resynchronizing the crawl frontier with a partially written
* warc file. This may happen if the crawl is interrupted or crashes.
* <p>
* This is best-effort and not guaranteed to recover all data, but it should limit
* the amount of data that is lost and needs to be re-crawled in the event of an unexpected
* shutdown.
*/
public class CrawlerWarcResynchronizer {
private final DomainCrawlFrontier crawlFrontier;
private final WarcRecorder recorder;
private static final Logger logger = LoggerFactory.getLogger(CrawlerWarcResynchronizer.class);
public CrawlerWarcResynchronizer(DomainCrawlFrontier crawlFrontier, WarcRecorder recorder) {
this.crawlFrontier = crawlFrontier;
this.recorder = recorder;
}
public void run(Path tempFile) {
// First pass, enqueue links
try (var reader = new WarcReader(tempFile)) {
for (var item : reader) {
accept(item);
}
} catch (IOException e) {
logger.info(STR."Failed read full warc file \{tempFile}", e);
}
// Second pass, copy records to the new warc file
try (var reader = new WarcReader(tempFile)) {
for (var item : reader) {
recorder.resync(item);
}
} catch (IOException e) {
logger.info(STR."Failed read full warc file \{tempFile}", e);
}
}
public void accept(WarcRecord item) {
try {
if (item instanceof WarcResponse rsp) {
response(rsp);
} else if (item instanceof WarcRevisit revisit) {
revisit(revisit);
} else if (item instanceof WarcRequest req) {
request(req);
}
}
catch (Exception ex) {
logger.info(STR."Failed to process warc record \{item}", ex);
}
}
private void request(WarcRequest request) {
EdgeUrl.parse(request.target()).ifPresent(crawlFrontier::addVisited);
}
private void response(WarcResponse rsp) {
var url = new EdgeUrl(rsp.targetURI());
crawlFrontier.addVisited(url);
try {
var response = HttpFetchResult.importWarc(rsp);
if (DocumentBodyExtractor.extractBody(response) instanceof DocumentBodyResult.Ok ok) {
var doc = Jsoup.parse(ok.body());
crawlFrontier.enqueueLinksFromDocument(url, doc);
}
}
catch (Exception e) {
logger.info(STR."Failed to parse response body for \{url}", e);
}
}
private void revisit(WarcRevisit revisit) throws IOException {
if (!WarcRecorder.revisitURI.equals(revisit.profile())) {
return;
}
var url = new EdgeUrl(revisit.targetURI());
crawlFrontier.addVisited(url);
try {
var response = HttpFetchResult.importWarc(revisit);
if (DocumentBodyExtractor.extractBody(response) instanceof DocumentBodyResult.Ok ok) {
var doc = Jsoup.parse(ok.body());
crawlFrontier.enqueueLinksFromDocument(url, doc);
}
}
catch (Exception e) {
logger.info(STR."Failed to parse response body for \{url}", e);
}
}
}

View File

@ -3,14 +3,19 @@ package nu.marginalia.crawl.retreival;
import com.google.common.hash.HashFunction;
import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
import nu.marginalia.ip_blocklist.UrlBlocklist;
import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import org.jsoup.nodes.Document;
import java.net.URISyntaxException;
import java.util.*;
import java.util.function.Predicate;
public class DomainCrawlFrontier {
private static final LinkParser linkParser = new LinkParser();
private final ArrayDeque<String> queue;
// To save the number of strings kept in memory,
@ -141,4 +146,27 @@ public class DomainCrawlFrontier {
public int queueSize() {
return queue.size();
}
public void enqueueLinksFromDocument(EdgeUrl baseUrl, Document parsed) {
baseUrl = linkParser.getBaseLink(parsed, baseUrl);
for (var link : parsed.getElementsByTag("a")) {
linkParser.parseLink(baseUrl, link).ifPresent(this::addToQueue);
}
for (var link : parsed.getElementsByTag("frame")) {
linkParser.parseFrame(baseUrl, link).ifPresent(this::addToQueue);
}
for (var link : parsed.getElementsByTag("iframe")) {
linkParser.parseFrame(baseUrl, link).ifPresent(this::addToQueue);
}
for (var link : parsed.getElementsByTag("link")) {
String rel = link.attr("rel");
if (rel.equalsIgnoreCase("next") || rel.equalsIgnoreCase("prev")) {
linkParser.parseLink(baseUrl, link).ifPresent(this::addToQueue);
}
}
}
}

View File

@ -8,6 +8,7 @@ import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import java.nio.file.Path;
import java.util.List;
@ImplementedBy(HttpFetcherImpl.class)

View File

@ -5,22 +5,21 @@ import com.google.inject.name.Named;
import crawlercommons.robots.SimpleRobotRules;
import crawlercommons.robots.SimpleRobotRulesParser;
import lombok.SneakyThrows;
import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.crawl.retreival.Cookies;
import nu.marginalia.crawl.retreival.RateLimitException;
import nu.marginalia.crawl.retreival.fetcher.ContentTypeProber.ContentTypeProbeResult;
import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyExtractor;
import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyResult;
import nu.marginalia.crawl.retreival.fetcher.socket.*;
import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult;
import static nu.marginalia.crawl.retreival.fetcher.CrawledDocumentFactory.*;
import static nu.marginalia.crawl.retreival.CrawledDocumentFactory.*;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.crawl.retreival.logic.ContentTypeLogic;
import nu.marginalia.contenttype.ContentTypeParser;
import okhttp3.*;
import org.apache.commons.io.input.BOMInputStream;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -34,7 +33,6 @@ import java.nio.charset.IllegalCharsetNameException;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
public class HttpFetcherImpl implements HttpFetcher {
@ -45,7 +43,7 @@ public class HttpFetcherImpl implements HttpFetcher {
private static final SimpleRobotRulesParser robotsParser = new SimpleRobotRulesParser();
private final ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
private static final ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
private final ContentTypeProber contentTypeProber;
@Override
@ -188,14 +186,14 @@ public class HttpFetcherImpl implements HttpFetcher {
}
else if (result instanceof HttpFetchResult.ResultOk ok) {
try {
return extractBody(url, ok);
return extractBody(userAgent, url, ok);
}
catch (Exception ex) {
return createErrorFromException(url, ex);
}
}
else {
throw new IllegalStateException("Unknown result type " + result.getClass());
throw new IllegalStateException(STR."Unknown result type \{result.getClass()}");
}
}
@ -216,7 +214,7 @@ public class HttpFetcherImpl implements HttpFetcher {
};
}
private CrawledDocument extractBody(EdgeUrl url, HttpFetchResult.ResultOk rsp) throws IOException, RateLimitException {
public static CrawledDocument extractBody(String userAgent, EdgeUrl url, HttpFetchResult.ResultOk rsp) throws IOException, RateLimitException {
var responseUrl = new EdgeUrl(rsp.uri());
@ -230,29 +228,6 @@ public class HttpFetcherImpl implements HttpFetcher {
throw new RateLimitException(retryAfter);
}
var byteStream = rsp.getInputStream();
if ("gzip".equals(rsp.header("Content-Encoding"))) {
byteStream = new GZIPInputStream(byteStream);
}
byteStream = new BOMInputStream(byteStream);
var contentTypeHeader = rsp.header("Content-Type");
if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) {
return createErrorResponse(url, rsp, CrawlerDocumentStatus.BAD_CONTENT_TYPE, "");
}
byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder
var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data);
if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) {
return createErrorResponse(url, rsp, CrawlerDocumentStatus.BAD_CONTENT_TYPE, "");
}
if ("Shift_JIS".equalsIgnoreCase(contentType.charset())) {
return createErrorResponse(url, rsp, CrawlerDocumentStatus.BAD_CHARSET, "");
}
if (!isXRobotsTagsPermitted(rsp.allHeaders("X-Robots-Tag"), userAgent)) {
return CrawledDocument.builder()
.crawlerStatus(CrawlerDocumentStatus.ROBOTS_TXT.name())
@ -264,17 +239,20 @@ public class HttpFetcherImpl implements HttpFetcher {
.build();
}
var strData = DocumentBodyToString.getStringData(contentType, data);
return CrawledDocument.builder()
.crawlerStatus(CrawlerDocumentStatus.OK.name())
.headers(rsp.headers().toString())
.contentType(contentTypeHeader)
.timestamp(LocalDateTime.now().toString())
.httpStatus(rsp.statusCode())
.url(responseUrl.toString())
.documentBody(strData)
.build();
return switch(DocumentBodyExtractor.extractBody(rsp)) {
case DocumentBodyResult.Error(CrawlerDocumentStatus status, String why) ->
createErrorResponse(url, rsp, status, why);
case DocumentBodyResult.Ok(String contentType, String body) ->
CrawledDocument.builder()
.crawlerStatus(CrawlerDocumentStatus.OK.name())
.headers(rsp.headers().toString())
.contentType(contentType)
.timestamp(LocalDateTime.now().toString())
.httpStatus(rsp.statusCode())
.url(responseUrl.toString())
.documentBody(body)
.build();
};
}
/** Check X-Robots-Tag header tag to see if we are allowed to index this page.

View File

@ -0,0 +1,44 @@
package nu.marginalia.crawl.retreival.fetcher.body;
import nu.marginalia.contenttype.ContentTypeParser;
import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult;
import nu.marginalia.crawl.retreival.logic.ContentTypeLogic;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import org.apache.commons.io.input.BOMInputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
public class DocumentBodyExtractor {
private static ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
public static DocumentBodyResult extractBody(HttpFetchResult.ResultOk rsp) throws IOException {
var byteStream = rsp.getInputStream();
if ("gzip".equals(rsp.header("Content-Encoding"))) {
byteStream = new GZIPInputStream(byteStream);
}
byteStream = new BOMInputStream(byteStream);
var contentTypeHeader = rsp.header("Content-Type");
if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) {
return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, "");
}
byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder
var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data);
if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) {
return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, "");
}
if ("Shift_JIS".equalsIgnoreCase(contentType.charset())) {
return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CHARSET, "");
}
return new DocumentBodyResult.Ok(contentType.contentType(), DocumentBodyToString.getStringData(contentType, data));
}
}

View File

@ -0,0 +1,8 @@
package nu.marginalia.crawl.retreival.fetcher.body;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
public sealed interface DocumentBodyResult {
record Ok(String contentType, String body) implements DocumentBodyResult { }
record Error(CrawlerDocumentStatus status, String why) implements DocumentBodyResult { }
}

View File

@ -1,13 +1,47 @@
package nu.marginalia.crawl.retreival.fetcher.warc;
import okhttp3.Headers;
import org.netpreserve.jwarc.MessageHeaders;
import org.netpreserve.jwarc.WarcResponse;
import org.netpreserve.jwarc.WarcRevisit;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.List;
public sealed interface HttpFetchResult {
static ResultOk importWarc(WarcResponse response) throws IOException {
var http = response.http();
try (var body = http.body()) {
byte[] bytes = body.stream().readAllBytes();
return new ResultOk(
response.targetURI(),
http.status(),
http.headers(),
bytes,
0,
bytes.length
);
}
}
static ResultOk importWarc(WarcRevisit revisit) throws IOException {
var http = revisit.http();
try (var body = http.body()) {
byte[] bytes = body.stream().readAllBytes();
return new ResultOk(
revisit.targetURI(),
http.status(),
http.headers(),
bytes,
0,
bytes.length
);
}
}
record ResultOk(URI uri,
int statusCode,
Headers headers,
@ -15,6 +49,26 @@ public sealed interface HttpFetchResult {
int bytesStart,
int bytesLength
) implements HttpFetchResult {
public ResultOk(URI uri,
int statusCode,
MessageHeaders headers,
byte[] bytesRaw,
int bytesStart,
int bytesLength) {
this(uri, statusCode, convertHeaders(headers), bytesRaw, bytesStart, bytesLength);
}
private static Headers convertHeaders(MessageHeaders headers) {
var ret = new Headers.Builder();
for (var header : headers.map().entrySet()) {
for (var value : header.getValue()) {
ret.add(header.getKey(), value);
}
}
return ret.build();
}
public InputStream getInputStream() {
return new ByteArrayInputStream(bytesRaw, bytesStart, bytesLength);
}
@ -26,6 +80,7 @@ public sealed interface HttpFetchResult {
return headers.values(name);
}
};
record ResultError(Exception ex) implements HttpFetchResult { };
}

View File

@ -34,6 +34,17 @@ public class WarcProtocolReconstructor {
return requestStringBuilder.toString();
}
static String getResponseHeader(String headersAsString, int code) {
String version = "1.1";
String statusCode = String.valueOf(code);
String statusMessage = STATUS_CODE_MAP.getOrDefault(code, "Unknown");
String headerString = getHeadersAsString(headersAsString);
return STR."HTTP/\{version} \{statusCode} \{statusMessage}\r\n\{headerString}\r\n\r\n";
}
static String getResponseHeader(Response response) {
String version = response.protocol() == Protocol.HTTP_1_1 ? "1.1" : "2.0";
@ -99,6 +110,13 @@ public class WarcProtocolReconstructor {
Map.entry(511, "Network Authentication Required")
);
static private String getHeadersAsString(String headersBlob) {
StringJoiner joiner = new StringJoiner("\r\n");
Arrays.stream(headersBlob.split("\n")).forEach(joiner::add);
return joiner.toString();
}
static private String getHeadersAsString(Response response) {
StringJoiner joiner = new StringJoiner("\r\n");

View File

@ -1,12 +1,14 @@
package nu.marginalia.crawl.retreival.fetcher.warc;
import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor;
import nu.marginalia.model.EdgeUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.netpreserve.jwarc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
@ -16,6 +18,9 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/** Based on JWarc's fetch method, APL 2.0 license
* <p></p>
@ -24,6 +29,8 @@ import java.time.Instant;
* be reconstructed.
*/
public class WarcRecorder implements AutoCloseable {
public static final URI revisitURI = URI.create("urn:marginalia:revisit");
private static final int MAX_TIME = 30_000;
private static final int MAX_SIZE = 1024 * 1024 * 10;
private final WarcWriter writer;
@ -85,8 +92,6 @@ public class WarcRecorder implements AutoCloseable {
inputStream = body.byteStream();
}
byte[] buf = new byte[8192];
ip = IpInterceptingNetworkInterceptor.getIpFromResponse(response);
String responseHeaders = WarcProtocolReconstructor.getResponseHeader(response);
@ -111,9 +116,6 @@ public class WarcRecorder implements AutoCloseable {
responseDataBuffer.updateDigest(payloadDigestBuilder, startPos, n);
totalLength += n;
responseDigestBuilder.update(buf, n);
payloadDigestBuilder.update(buf, n);
if (MAX_TIME > 0 && System.currentTimeMillis() - startMillis > MAX_TIME) {
truncationReason = WarcTruncationReason.TIME;
break;
@ -138,8 +140,6 @@ public class WarcRecorder implements AutoCloseable {
// Build and write the response
long pos = writer.position();
var warcResponse = responseBuilder.build();
warcResponse.http(); // force HTTP header to be parsed before body is consumed so that caller can use it
writer.write(warcResponse);
@ -174,6 +174,59 @@ public class WarcRecorder implements AutoCloseable {
}
}
public void resync(WarcRecord item) throws IOException {
writer.write(item);
}
/**
* Flag the given URL as skipped by the crawler, so that it will not be retried.
* Which URLs were skipped is still important when resynchronizing on the WARC file,
* so that the crawler can avoid re-fetching them.
*
* @param url The URL to flag
* @param headers
* @param documentBody
*/
public void flagAsSkipped(EdgeUrl url, String headers, int statusCode, String documentBody) {
try {
WarcDigestBuilder responseDigestBuilder = new WarcDigestBuilder();
WarcDigestBuilder payloadDigestBuilder = new WarcDigestBuilder();
String header = WarcProtocolReconstructor.getResponseHeader(headers, statusCode);
ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer();
responseDataBuffer.put(header);
responseDigestBuilder.update(header);
try (var inputStream = new ByteArrayInputStream(documentBody.getBytes())) {
int remainingLength;
while ((remainingLength = responseDataBuffer.remaining()) > 0) {
int startPos = responseDataBuffer.pos();
int n = responseDataBuffer.readFrom(inputStream, remainingLength);
if (n < 0)
break;
responseDataBuffer.updateDigest(responseDigestBuilder, startPos, n);
responseDataBuffer.updateDigest(payloadDigestBuilder, startPos, n);
}
}
WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), revisitURI)
.blockDigest(responseDigestBuilder.build())
.payloadDigest(payloadDigestBuilder.build())
.date(Instant.now())
.body(MediaType.HTTP_RESPONSE, responseDataBuffer.copyBytes())
.build();
revisit.http(); // force HTTP header to be parsed before body is consumed so that caller can use it
writer.write(revisit);
} catch (URISyntaxException | IOException | NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
private class ResponseDataBuffer {
private final byte[] data;

View File

@ -0,0 +1,123 @@
package nu.marginalia.crawl.retreival.revisit;
import crawlercommons.robots.SimpleRobotRules;
import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.DomainCrawlFrontier;
import nu.marginalia.crawl.retreival.CrawledDocumentFactory;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.model.EdgeUrl;
import org.jsoup.Jsoup;
import java.util.function.Consumer;
/** This class encapsulates the logic for re-visiting a domain that has already been crawled.
* We may use information from the previous crawl to inform the next crawl, specifically the
* E-Tag and Last-Modified headers.
*/
public class CrawlerRevisitor {
/** recrawlState tag for documents that had a HTTP status 304 */
public static final String documentWasRetainedTag = "RETAINED/304";
/** recrawlState tag for documents that had a 200 status but were identical to a previous version */
public static final String documentWasSameTag = "SAME-BY-COMPARISON";
private final DomainCrawlFrontier crawlFrontier;
private final Consumer<SerializableCrawlData> crawledDomainWriter;
private final CrawlerRetreiver crawlerRetreiver;
private final WarcRecorder warcRecorder;
public CrawlerRevisitor(DomainCrawlFrontier crawlFrontier,
Consumer<SerializableCrawlData> crawledDomainWriter,
CrawlerRetreiver crawlerRetreiver,
WarcRecorder warcRecorder) {
this.crawlFrontier = crawlFrontier;
this.crawledDomainWriter = crawledDomainWriter;
this.crawlerRetreiver = crawlerRetreiver;
this.warcRecorder = warcRecorder;
}
/** Performs a re-crawl of old documents, comparing etags and last-modified */
public int recrawl(CrawlDataReference oldCrawlData,
SimpleRobotRules robotsRules,
CrawlDelayTimer delayTimer) {
int recrawled = 0;
int retained = 0;
for (;;) {
CrawledDocument doc = oldCrawlData.nextDocument();
if (doc == null) {
break;
}
// This Shouldn't Happen (TM)
var urlMaybe = EdgeUrl.parse(doc.url);
if (urlMaybe.isEmpty()) continue;
var url = urlMaybe.get();
// If we've previously 404:d on this URL, we'll refrain from trying to fetch it again
if (doc.httpStatus == 404) {
crawlFrontier.addVisited(url);
continue;
}
if (doc.httpStatus != 200) continue;
if (!robotsRules.isAllowed(url.toString())) {
crawledDomainWriter.accept(CrawledDocumentFactory.createRobotsError(url));
continue;
}
if (!crawlFrontier.filterLink(url))
continue;
if (!crawlFrontier.addVisited(url))
continue;
if (recrawled > 5
&& retained > 0.9 * recrawled
&& Math.random() < 0.9)
{
// Since it looks like most of these documents haven't changed,
// we'll load the documents directly; but we do this in a random
// fashion to make sure we eventually catch changes over time
// and ensure we discover new links
crawledDomainWriter.accept(doc);
crawlFrontier.addVisited(url);
// Hoover up any links from the document
if (doc.httpStatus == 200 && doc.documentBody != null) {
var parsedDoc = Jsoup.parse(doc.documentBody);
crawlFrontier.enqueueLinksFromDocument(url, parsedDoc);
}
// Add a WARC record so we don't repeat this
warcRecorder.flagAsSkipped(url, doc.headers, doc.httpStatus, doc.documentBody);
continue;
}
// GET the document with the stored document as a reference
// providing etag and last-modified headers, so we can recycle the
// document if it hasn't changed without actually downloading it
var fetchedDocOpt = crawlerRetreiver.fetchWriteAndSleep(url,
delayTimer,
new DocumentWithReference(doc, oldCrawlData));
if (fetchedDocOpt.isEmpty()) continue;
if (documentWasRetainedTag.equals(fetchedDocOpt.get().recrawlState)) retained ++;
else if (documentWasSameTag.equals(fetchedDocOpt.get().recrawlState)) retained ++;
recrawled ++;
}
return recrawled;
}
}

View File

@ -0,0 +1,82 @@
package nu.marginalia.crawl.retreival.revisit;
import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.fetcher.ContentTags;
import nu.marginalia.crawling.model.CrawledDocument;
import javax.annotation.Nullable;
import java.time.LocalDateTime;
public record DocumentWithReference(
@Nullable CrawledDocument doc,
@Nullable CrawlDataReference reference) {
private static final DocumentWithReference emptyInstance = new DocumentWithReference(null, null);
public static DocumentWithReference empty() {
return emptyInstance;
}
public boolean isContentBodySame(CrawledDocument newDoc) {
if (reference == null)
return false;
if (doc == null)
return false;
if (doc.documentBody == null)
return false;
if (newDoc.documentBody == null)
return false;
return reference.isContentBodySame(doc, newDoc);
}
public ContentTags getContentTags() {
if (null == doc)
return ContentTags.empty();
String headers = doc.headers;
if (headers == null)
return ContentTags.empty();
String[] headersLines = headers.split("\n");
String lastmod = null;
String etag = null;
for (String line : headersLines) {
if (line.toLowerCase().startsWith("etag:")) {
etag = line.substring(5).trim();
}
if (line.toLowerCase().startsWith("last-modified:")) {
lastmod = line.substring(14).trim();
}
}
return new ContentTags(etag, lastmod);
}
public boolean isEmpty() {
return doc == null || reference == null;
}
/**
* If the provided document has HTTP status 304, and the reference document is provided,
* return the reference document; otherwise return the provided document.
*/
public CrawledDocument replaceOn304(CrawledDocument fetchedDoc) {
if (doc == null)
return fetchedDoc;
// HTTP status 304 is NOT MODIFIED, which means the document is the same as it was when
// we fetched it last time. We can recycle the reference document.
if (fetchedDoc.httpStatus != 304)
return fetchedDoc;
var ret = doc;
ret.recrawlState = CrawlerRevisitor.documentWasRetainedTag;
ret.timestamp = LocalDateTime.now().toString();
return ret;
}
}

View File

@ -0,0 +1,71 @@
package nu.marginalia.crawl.retreival.sitemap;
import crawlercommons.robots.SimpleRobotRules;
import nu.marginalia.crawl.retreival.DomainCrawlFrontier;
import nu.marginalia.crawl.retreival.fetcher.SitemapRetriever;
import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
public class SitemapFetcher {
private final DomainCrawlFrontier crawlFrontier;
private final SitemapRetriever sitemapRetriever;
private static final Logger logger = LoggerFactory.getLogger(SitemapFetcher.class);
public SitemapFetcher(DomainCrawlFrontier crawlFrontier, SitemapRetriever sitemapRetriever) {
this.crawlFrontier = crawlFrontier;
this.sitemapRetriever = sitemapRetriever;
}
public void downloadSitemaps(SimpleRobotRules robotsRules, EdgeUrl rootUrl) {
List<String> sitemaps = robotsRules.getSitemaps();
List<EdgeUrl> urls = new ArrayList<>(sitemaps.size());
if (!sitemaps.isEmpty()) {
for (var url : sitemaps) {
EdgeUrl.parse(url).ifPresent(urls::add);
}
}
else {
urls.add(rootUrl.withPathAndParam("/sitemap.xml", null));
}
downloadSitemaps(urls);
}
public void downloadSitemaps(List<EdgeUrl> urls) {
Set<String> checkedSitemaps = new HashSet<>();
for (var url : urls) {
// Let's not download sitemaps from other domains for now
if (!crawlFrontier.isSameDomain(url)) {
continue;
}
if (checkedSitemaps.contains(url.path))
continue;
var sitemap = sitemapRetriever.fetchSitemap(url);
if (sitemap.isEmpty()) {
continue;
}
// ensure we don't try to download this sitemap again
// (don't move this up, as we may want to check the same
// path with different protocols until we find one that works)
checkedSitemaps.add(url.path);
crawlFrontier.addAllToQueue(sitemap);
}
logger.debug("Queue is now {}", crawlFrontier.queueSize());
}
}

View File

@ -0,0 +1,88 @@
package nu.marginalia.crawl.retreival;
import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.netpreserve.jwarc.WarcReader;
import org.netpreserve.jwarc.WarcRequest;
import org.netpreserve.jwarc.WarcResponse;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.security.NoSuchAlgorithmException;
import java.util.List;
import java.util.zip.GZIPInputStream;
import static org.junit.jupiter.api.Assertions.*;
class CrawlerWarcResynchronizerTest {
Path fileName;
Path outputFile;
OkHttpClient httpClient;
@BeforeEach
public void setUp() throws Exception {
httpClient = new OkHttpClient.Builder()
.addNetworkInterceptor(new IpInterceptingNetworkInterceptor())
.build();
fileName = Files.createTempFile("test", ".warc.gz");
outputFile = Files.createTempFile("test", ".warc.gz");
}
@AfterEach
public void tearDown() throws Exception {
Files.deleteIfExists(fileName);
Files.deleteIfExists(outputFile);
}
@Test
void run() throws IOException, URISyntaxException {
try (var oldRecorder = new WarcRecorder(fileName)) {
fetchUrl(oldRecorder, "https://www.marginalia.nu/");
fetchUrl(oldRecorder, "https://www.marginalia.nu/log/");
fetchUrl(oldRecorder, "https://www.marginalia.nu/feed/");
} catch (Exception e) {
fail(e);
}
var crawlFrontier = new DomainCrawlFrontier(new EdgeDomain("www.marginalia.nu"), List.of(), 100);
try (var newRecorder = new WarcRecorder(outputFile)) {
new CrawlerWarcResynchronizer(crawlFrontier, newRecorder).run(fileName);
}
assertTrue(crawlFrontier.isVisited(new EdgeUrl("https://www.marginalia.nu/")));
assertTrue(crawlFrontier.isVisited(new EdgeUrl("https://www.marginalia.nu/log/")));
assertTrue(crawlFrontier.isVisited(new EdgeUrl("https://www.marginalia.nu/feed/")));
try (var warcReader = new WarcReader(outputFile)) {
for (var item : warcReader) {
if (item instanceof WarcRequest req) {
System.out.println("req:" + req.target());
}
if (item instanceof WarcResponse rsp) {
System.out.println("req:" + rsp.target());
}
}
}
new GZIPInputStream(Files.newInputStream(outputFile)).transferTo(System.out);
}
void fetchUrl(WarcRecorder recorder, String url) throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
var req = new Request.Builder().url(url)
.addHeader("User-agent", "test.marginalia.nu")
.addHeader("Accept-Encoding", "gzip")
.get().build();
recorder.fetch(httpClient, req);
}
}

View File

@ -2,6 +2,7 @@ package nu.marginalia.crawl.retreival.fetcher;
import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.model.EdgeUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.junit.jupiter.api.AfterEach;
@ -66,4 +67,33 @@ class WarcRecorderTest {
assertEquals("https://www.marginalia.nu/", sampleData.get("request"));
assertEquals("https://www.marginalia.nu/", sampleData.get("response"));
}
@Test
public void flagAsSkipped() throws IOException, URISyntaxException {
try (var recorder = new WarcRecorder(fileName)) {
recorder.flagAsSkipped(new EdgeUrl("https://www.marginalia.nu/"),
"""
Content-type: text/html
X-Cookies: 1
""",
200,
"<?doctype html><html><body>test</body></html>");
}
try (var reader = new WarcReader(fileName)) {
for (var record : reader) {
if (record instanceof WarcResponse rsp) {
assertEquals("https://www.marginalia.nu/", rsp.target());
assertEquals("text/html", rsp.contentType().type());
assertEquals(200, rsp.http().status());
assertEquals("1", rsp.http().headers().first("X-Cookies").orElse(null));
}
}
}
new GZIPInputStream(Files.newInputStream(fileName)).transferTo(System.out);
}
}

View File

@ -21,6 +21,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@ -151,5 +152,6 @@ public class CrawlerMockFetcherTest {
public SitemapRetriever createSitemapRetriever() {
return Mockito.mock(SitemapRetriever.class);
}
}
}