(crawler) WIP integration of WARC files into the crawler and converter process.

This commit is in a pretty rough state.  It refactors the crawler fairly significantly to offer better separation of concerns.  It replaces the zstd compressed json files used to store crawl data with WARC files entirely, and the converter is modified to be able to consume this data.  This works, -ish.

There appears to be some bug relating to reading robots.txt, and the X-Robots-Tag header is no longer processed either.

A problem is that the WARC files are a bit too large.  It will probably be likely to introduce a new format to store the crawl data long term, something like parquet; and use WARCs for intermediate storage to enable the crawler to be restarted without needing a recrawl.
This commit is contained in:
Viktor Lofgren 2023-12-13 15:33:42 +01:00
parent b74a3ebd85
commit 440e097d78
36 changed files with 966 additions and 621 deletions

View File

@ -20,13 +20,18 @@ dependencies {
implementation project(':code:api:index-api')
implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client')
implementation project(':code:features-crawl:content-type')
implementation project(':code:libraries:language-processing')
implementation libs.bundles.slf4j
implementation libs.notnull
implementation libs.jwarc
implementation libs.gson
implementation libs.commons.io
implementation libs.okhttp3
implementation libs.jsoup
implementation libs.snakeyaml
implementation libs.zstd

View File

@ -1,4 +1,4 @@
package nu.marginalia.crawl.retreival.logic;
package nu.marginalia.crawling.body;
import nu.marginalia.model.EdgeUrl;

View File

@ -0,0 +1,60 @@
package nu.marginalia.crawling.body;
import nu.marginalia.contenttype.ContentTypeParser;
import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import org.apache.commons.io.input.BOMInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
public class DocumentBodyExtractor {
private static ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
private static final Logger logger = LoggerFactory.getLogger(DocumentBodyExtractor.class);
public static DocumentBodyResult extractBody(HttpFetchResult result) {
if (result instanceof HttpFetchResult.ResultOk fetchOk) {
return extractBody(fetchOk);
}
else {
return new DocumentBodyResult.Error(CrawlerDocumentStatus.ERROR, "");
}
}
public static DocumentBodyResult extractBody(HttpFetchResult.ResultOk rsp) {
try {
var byteStream = rsp.getInputStream();
if ("gzip".equals(rsp.header("Content-Encoding"))) {
byteStream = new GZIPInputStream(byteStream);
}
byteStream = new BOMInputStream(byteStream);
var contentTypeHeader = rsp.header("Content-Type");
if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) {
return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, "");
}
byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder
var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data);
if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) {
return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, "");
}
if ("Shift_JIS".equalsIgnoreCase(contentType.charset())) {
return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CHARSET, "");
}
return new DocumentBodyResult.Ok(contentType.contentType(), DocumentBodyToString.getStringData(contentType, data));
}
catch (IOException ex) {
logger.error("Failed to extract body", ex);
return new DocumentBodyResult.Error(CrawlerDocumentStatus.ERROR, "");
}
}
}

View File

@ -0,0 +1,23 @@
package nu.marginalia.crawling.body;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import java.util.Optional;
import java.util.function.BiFunction;
public sealed interface DocumentBodyResult {
record Ok(String contentType, String body) implements DocumentBodyResult {
@Override
public <T> Optional<T> map(BiFunction<String, String, T> fun) {
return Optional.of(fun.apply(contentType, body));
}
}
record Error(CrawlerDocumentStatus status, String why) implements DocumentBodyResult {
@Override
public <T> Optional<T> map(BiFunction<String, String, T> fun) {
return Optional.empty();
}
}
<T> Optional<T> map(BiFunction<String, String, T> fun);
}

View File

@ -1,17 +1,23 @@
package nu.marginalia.crawl.retreival.fetcher.warc;
package nu.marginalia.crawling.body;
import okhttp3.Headers;
import org.jsoup.Jsoup;
import org.netpreserve.jwarc.MessageHeaders;
import org.netpreserve.jwarc.WarcResponse;
import org.netpreserve.jwarc.WarcRevisit;
import org.jsoup.nodes.Document;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.util.List;
import java.util.Optional;
public sealed interface HttpFetchResult {
boolean isOk();
static ResultOk importWarc(WarcResponse response) throws IOException {
var http = response.http();
try (var body = http.body()) {
@ -27,6 +33,7 @@ public sealed interface HttpFetchResult {
);
}
}
static ResultOk importWarc(WarcRevisit revisit) throws IOException {
var http = revisit.http();
try (var body = http.body()) {
@ -41,7 +48,11 @@ public sealed interface HttpFetchResult {
bytes.length
);
}
finally {
revisit.body().consume();
}
}
record ResultOk(URI uri,
int statusCode,
Headers headers,
@ -50,6 +61,10 @@ public sealed interface HttpFetchResult {
int bytesLength
) implements HttpFetchResult {
public boolean isOk() {
return statusCode >= 200 && statusCode < 300;
}
public ResultOk(URI uri,
int statusCode,
MessageHeaders headers,
@ -73,6 +88,14 @@ public sealed interface HttpFetchResult {
return new ByteArrayInputStream(bytesRaw, bytesStart, bytesLength);
}
public Optional<Document> parseDocument() throws IOException {
return switch(DocumentBodyExtractor.extractBody(this)) {
case DocumentBodyResult.Ok ok when "text/html".equalsIgnoreCase(ok.contentType())
-> Optional.of(Jsoup.parse(ok.body()));
default -> Optional.empty();
};
}
public String header(String name) {
return headers.get(name);
}
@ -82,5 +105,34 @@ public sealed interface HttpFetchResult {
};
record ResultError(Exception ex) implements HttpFetchResult { };
record ResultRetained(String url, String body) implements HttpFetchResult {
public boolean isOk() {
return true;
}
public Optional<Document> parseDocument() {
try {
return Optional.of(Jsoup.parse(body));
}
catch (Exception ex) {
return Optional.empty();
}
}
};
record ResultException(Exception ex) implements HttpFetchResult {
public boolean isOk() {
return false;
}
};
record ResultSame() implements HttpFetchResult {
public boolean isOk() {
return false;
}
};
record ResultNone() implements HttpFetchResult {
public boolean isOk() {
return false;
}
};
}

View File

@ -1,156 +1,44 @@
package nu.marginalia.crawling.io;
import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdInputStream;
import com.google.gson.Gson;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.crawling.io.format.LegacyFileReadingSerializableCrawlDataStream;
import nu.marginalia.crawling.io.format.WarcReadingSerializableCrawlDataStream;
import nu.marginalia.model.gson.GsonFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
public class CrawledDomainReader {
private final Gson gson = GsonFactory.get();
private final Logger logger = LoggerFactory.getLogger(getClass());
private final ForkJoinPool pool = new ForkJoinPool(6);
private static final Gson gson = GsonFactory.get();
public CrawledDomainReader() {
}
/** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */
public SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException {
return new FileReadingSerializableCrawlDataStream(gson, fullPath.toFile());
public static SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException {
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".zstd")) {
return new LegacyFileReadingSerializableCrawlDataStream(gson, fullPath.toFile());
}
else if (fileName.endsWith(".warc") || fileName.endsWith(".warc.gz")) {
return new WarcReadingSerializableCrawlDataStream(fullPath);
}
else {
throw new IllegalArgumentException("Unknown file type: " + fullPath);
}
}
/** An iterator-like access to domain data. This must be closed otherwise it will leak off-heap memory! */
public SerializableCrawlDataStream createDataStream(Path basePath, String domain, String id) throws IOException {
return createDataStream(CrawlerOutputFile.getOutputFile(basePath, id, domain));
}
public static SerializableCrawlDataStream createDataStream(Path basePath, String domain, String id) throws IOException {
Path warcPath = CrawlerOutputFile.getWarcPath(basePath, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL);
/** Read the entirety of the domain data into memory. This uses a lot of RAM */
public CrawledDomain read(Path path) throws IOException {
DomainDataAssembler domainData = new DomainDataAssembler();
try (var br = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(path.toFile()), RecyclingBufferPool.INSTANCE)))) {
String line;
while ((line = br.readLine()) != null) {
if (line.startsWith("//")) {
String identifier = line;
String data = br.readLine();
pool.execute(() -> deserializeLine(identifier, data, domainData));
}
}
if (Files.exists(warcPath)) {
return createDataStream(warcPath);
}
while (!pool.awaitQuiescence(1, TimeUnit.SECONDS));
return domainData.assemble();
}
private void deserializeLine(String identifier, String data, DomainDataAssembler assembler) {
if (null == data) {
return;
}
if (identifier.equals(CrawledDomain.SERIAL_IDENTIFIER)) {
assembler.acceptDomain(gson.fromJson(data, CrawledDomain.class));
} else if (identifier.equals(CrawledDocument.SERIAL_IDENTIFIER)) {
assembler.acceptDoc(gson.fromJson(data, CrawledDocument.class));
else {
return createDataStream(CrawlerOutputFile.getLegacyOutputFile(basePath, id, domain));
}
}
public Optional<CrawledDomain> readOptionally(Path path) {
try {
return Optional.of(read(path));
}
catch (Exception ex) {
return Optional.empty();
}
}
private static class DomainDataAssembler {
private CrawledDomain domainPrototype;
private final List<CrawledDocument> docs = new ArrayList<>();
public synchronized void acceptDomain(CrawledDomain domain) {
this.domainPrototype = domain;
}
public synchronized void acceptDoc(CrawledDocument doc) {
docs.add(doc);
}
public synchronized CrawledDomain assemble() {
if (!docs.isEmpty()) {
if (domainPrototype.doc == null)
domainPrototype.doc = new ArrayList<>();
domainPrototype.doc.addAll(docs);
}
return domainPrototype;
}
}
private static class FileReadingSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
private final Gson gson;
private final BufferedReader bufferedReader;
private SerializableCrawlData next = null;
public FileReadingSerializableCrawlDataStream(Gson gson, File file) throws IOException {
this.gson = gson;
bufferedReader = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file), RecyclingBufferPool.INSTANCE)));
}
@Override
public SerializableCrawlData next() throws IOException {
if (hasNext()) {
var ret = next;
next = null;
return ret;
}
throw new IllegalStateException("No more data");
}
@Override
public boolean hasNext() throws IOException {
if (next != null)
return true;
String identifier = bufferedReader.readLine();
if (identifier == null) {
bufferedReader.close();
return false;
}
String data = bufferedReader.readLine();
if (data == null) {
bufferedReader.close();
return false;
}
if (identifier.equals(CrawledDomain.SERIAL_IDENTIFIER)) {
next = gson.fromJson(data, CrawledDomain.class);
} else if (identifier.equals(CrawledDocument.SERIAL_IDENTIFIER)) {
next = gson.fromJson(data, CrawledDocument.class);
}
else {
throw new IllegalStateException("Unknown identifier: " + identifier);
}
return true;
}
@Override
public void close() throws Exception {
bufferedReader.close();
}
}
}

View File

@ -55,7 +55,7 @@ public class CrawledDomainWriter implements AutoCloseable {
}
private Path getOutputFile(String id, String name) throws IOException {
return CrawlerOutputFile.createOutputPath(outputDir, id, name);
return CrawlerOutputFile.createLegacyOutputPath(outputDir, id, name);
}
@Override

View File

@ -9,7 +9,11 @@ import java.nio.file.Path;
public class CrawlerOutputFile {
/** Return the Path to a file for the given id and name */
public static Path getOutputFile(Path base, String id, String name) {
public static Path getLegacyOutputFile(Path base, String id, String name) {
if (id.length() < 4) {
id = Strings.repeat("0", 4 - id.length()) + id;
}
String first = id.substring(0, 2);
String second = id.substring(2, 4);
@ -19,7 +23,7 @@ public class CrawlerOutputFile {
/** Return the Path to a file for the given id and name, creating the prerequisite
* directory structure as necessary. */
public static Path createOutputPath(Path base, String id, String name) throws IOException {
public static Path createLegacyOutputPath(Path base, String id, String name) throws IOException {
if (id.length() < 4) {
id = Strings.repeat("0", 4 - id.length()) + id;
}
@ -49,20 +53,37 @@ public class CrawlerOutputFile {
}
public static Path createWarcFile(Path baseDir, String id, String name, WarcFileVersion version) {
public static Path createWarcPath(Path basePath, String id, String domain, WarcFileVersion version) throws IOException {
if (id.length() < 4) {
id = Strings.repeat("0", 4 - id.length()) + id;
}
String fileName = STR."\{id}-\{filesystemSafeName(name)}.zstd\{version.suffix}";
String first = id.substring(0, 2);
String second = id.substring(2, 4);
return baseDir.resolve(fileName);
Path destDir = basePath.resolve(first).resolve(second);
if (!Files.exists(destDir)) {
Files.createDirectories(destDir);
}
return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}-\{version.suffix}.warc.gz");
}
public static Path getWarcPath(Path basePath, String id, String domain, WarcFileVersion version) {
if (id.length() < 4) {
id = Strings.repeat("0", 4 - id.length()) + id;
}
String first = id.substring(0, 2);
String second = id.substring(2, 4);
Path destDir = basePath.resolve(first).resolve(second);
return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}.warc\{version.suffix}");
}
public enum WarcFileVersion {
LIVE(".open"),
TEMP(".tmp"),
FINAL("");
LIVE("open"),
TEMP("tmp"),
FINAL("final");
public final String suffix;

View File

@ -1,11 +1,13 @@
package nu.marginalia.crawling.io;
import nu.marginalia.crawling.model.SerializableCrawlData;
import org.jetbrains.annotations.Nullable;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
/** Closable iterator over serialized crawl data
/** Closable iterator exceptional over serialized crawl data
* The data may appear in any order, and the iterator must be closed.
*
* @see CrawledDomainReader
@ -17,6 +19,8 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
boolean hasNext() throws IOException;
@Nullable
default Path path() { return null; }
// Dummy iterator over nothing
static SerializableCrawlDataStream empty() {

View File

@ -0,0 +1,70 @@
package nu.marginalia.crawling.io.format;
import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdInputStream;
import com.google.gson.Gson;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.SerializableCrawlData;
import java.io.*;
import java.nio.file.Path;
public class LegacyFileReadingSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
private final Gson gson;
private final BufferedReader bufferedReader;
private SerializableCrawlData next = null;
private final Path path;
public LegacyFileReadingSerializableCrawlDataStream(Gson gson, File file) throws IOException {
this.gson = gson;
bufferedReader = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file), RecyclingBufferPool.INSTANCE)));
path = file.toPath();
}
@Override
public Path path() {
return path;
}
@Override
public SerializableCrawlData next() throws IOException {
if (hasNext()) {
var ret = next;
next = null;
return ret;
}
throw new IllegalStateException("No more data");
}
@Override
public boolean hasNext() throws IOException {
if (next != null)
return true;
String identifier = bufferedReader.readLine();
if (identifier == null) {
bufferedReader.close();
return false;
}
String data = bufferedReader.readLine();
if (data == null) {
bufferedReader.close();
return false;
}
if (identifier.equals(CrawledDomain.SERIAL_IDENTIFIER)) {
next = gson.fromJson(data, CrawledDomain.class);
} else if (identifier.equals(CrawledDocument.SERIAL_IDENTIFIER)) {
next = gson.fromJson(data, CrawledDocument.class);
} else {
throw new IllegalStateException("Unknown identifier: " + identifier);
}
return true;
}
@Override
public void close() throws Exception {
bufferedReader.close();
}
}

View File

@ -0,0 +1,156 @@
package nu.marginalia.crawling.io.format;
import lombok.SneakyThrows;
import nu.marginalia.crawling.body.DocumentBodyExtractor;
import nu.marginalia.crawling.body.DocumentBodyResult;
import nu.marginalia.crawling.body.HttpFetchResult;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.SerializableCrawlData;
import org.netpreserve.jwarc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.StringJoiner;
public class WarcReadingSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
private static final Logger logger = LoggerFactory.getLogger(WarcReadingSerializableCrawlDataStream.class);
private final WarcReader reader;
private final Iterator<WarcRecord> backingIterator;
private SerializableCrawlData next = null;
private final Path path;
public WarcReadingSerializableCrawlDataStream(Path file) throws IOException {
path = file;
reader = new WarcReader(file);
WarcXResponseReference.register(reader);
backingIterator = reader.iterator();
}
@Override
public Path path() {
return path;
}
@Override
@SneakyThrows
public boolean hasNext() {
while (backingIterator.hasNext() && next == null) {
var nextRecord = backingIterator.next();
if (nextRecord instanceof WarcResponse response) { // this also includes WarcXResponseReference
convertResponse(response);
}
else if (nextRecord instanceof Warcinfo warcinfo) {
convertWarcinfo(warcinfo);
}
else if (nextRecord instanceof WarcMetadata metadata) {
convertMetadata(metadata);
}
}
return next != null;
}
private void convertMetadata(WarcMetadata metadata) {
// Nothing to do here for now
}
private void convertWarcinfo(Warcinfo warcinfo) throws IOException {
var headers = warcinfo.fields();
String probeStatus = headers.first("X-WARC-Probe-Status").orElse("");
String[] parts = probeStatus.split(" ", 2);
String domain = headers.first("domain").orElseThrow(() -> new IllegalStateException("Missing domain header"));
String status = parts[0];
String statusReason = parts.length > 1 ? parts[1] : "";
String ip = headers.first("ip").orElse("");
String redirectDomain = null;
if ("REDIRECT".equalsIgnoreCase(status)) {
redirectDomain = statusReason;
}
// TODO: Fix cookies info somehow
next = new CrawledDomain(domain, redirectDomain, status, statusReason, ip, List.of(), List.of());
}
private void convertResponse(WarcResponse response) throws IOException {
var http = response.http();
if (http.status() != 200) {
return;
}
CrawledDocument document;
var parsedBody = DocumentBodyExtractor.extractBody(HttpFetchResult.importWarc(response));
if (parsedBody instanceof DocumentBodyResult.Error error) {
next = new CrawledDocument(
"",
response.targetURI().toString(),
http.contentType().raw(),
response.date().toString(),
http.status(),
error.status().toString(),
error.why(),
headers(http.headers()),
null,
response.payloadDigest().map(WarcDigest::base64).orElse(""),
"",
"",
"");
} else if (parsedBody instanceof DocumentBodyResult.Ok ok) {
next = new CrawledDocument(
"",
response.targetURI().toString(),
ok.contentType(),
response.date().toString(),
http.status(),
"OK",
"",
headers(http.headers()),
ok.body(),
response.payloadDigest().map(WarcDigest::base64).orElse(""),
"",
"",
"");
} else {
// unreachable
throw new IllegalStateException("Unknown body type: " + parsedBody);
}
}
public String headers(MessageHeaders headers) {
StringJoiner ret = new StringJoiner("\n");
for (var header : headers.map().entrySet()) {
for (var value : header.getValue()) {
ret.add(STR."\{header.getKey()}: \{value}");
}
}
return ret.toString();
}
public void close() throws IOException {
reader.close();
}
@Override
public SerializableCrawlData next() throws IOException {
if (!hasNext())
throw new NoSuchElementException();
try {
return next;
}
finally {
next = null;
}
}
}

View File

@ -0,0 +1,44 @@
package org.netpreserve.jwarc;
import java.io.IOException;
import java.net.URI;
/** This defines a non-standard extension to WARC for storing old HTTP responses,
* essentially a 'revisit' with a full body, which is not something that is
* expected by the jwarc parser, and goes against the semantics of the revisit
* records a fair bit.
* <p>
* An x-response-reference record is a response record with a full body, where
* the data is a reconstructed HTTP response from a previous crawl.
*/
public class WarcXResponseReference extends WarcResponse {
private static final String TYPE_NAME = "x-response-reference";
WarcXResponseReference(MessageVersion version, MessageHeaders headers, MessageBody body) {
super(version, headers, body);
}
public static void register(WarcReader reader) {
reader.registerType(TYPE_NAME, WarcXResponseReference::new);
}
public static class Builder extends AbstractBuilder<WarcXResponseReference, Builder> {
public Builder(URI targetURI) {
this(targetURI.toString());
}
public Builder(String targetURI) {
super(TYPE_NAME);
setHeader("WARC-Target-URI", targetURI);
}
public Builder body(HttpResponse httpResponse) throws IOException {
return body(MediaType.HTTP_RESPONSE, httpResponse);
}
@Override
public WarcXResponseReference build() {
return build(WarcXResponseReference::new);
}
}
}

View File

@ -74,23 +74,13 @@ public class CrawlPlan {
return count;
}
@Deprecated
public Iterable<CrawledDomain> domainsIterable() {
final CrawledDomainReader reader = new CrawledDomainReader();
return WorkLog.iterableMap(crawl.getLogFile(),
entry -> {
var path = getCrawledFilePath(entry.path());
if (!Files.exists(path)) {
logger.warn("File not found: {}", path);
return Optional.empty();
}
return reader.readOptionally(path);
});
// This is no longer supported
throw new UnsupportedOperationException();
}
public Iterable<SerializableCrawlDataStream> crawlDataIterable(Predicate<String> idPredicate) {
final CrawledDomainReader reader = new CrawledDomainReader();
return WorkLog.iterableMap(crawl.getLogFile(),
entry -> {
if (!idPredicate.test(entry.id())) {
@ -105,7 +95,7 @@ public class CrawlPlan {
}
try {
return Optional.of(reader.createDataStream(path));
return Optional.of(CrawledDomainReader.createDataStream(path));
}
catch (IOException ex) {
return Optional.empty();

View File

@ -79,7 +79,7 @@ public class CrawlingThenConvertingIntegrationTest {
List<SerializableCrawlData> data = new ArrayList<>();
try (var recorder = new WarcRecorder()) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, data::add).fetch();
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch();
}
CrawledDomain domain = data.stream().filter(CrawledDomain.class::isInstance).map(CrawledDomain.class::cast).findFirst().get();

View File

@ -62,7 +62,6 @@ public class CrawlerMain {
private final SimpleBlockingThreadPool pool;
private final Map<String, String> processingIds = new ConcurrentHashMap<>();
private final CrawledDomainReader reader = new CrawledDomainReader();
final AbortMonitor abortMonitor = AbortMonitor.getInstance();
@ -142,6 +141,7 @@ public class CrawlerMain {
public void run(CrawlSpecProvider specProvider, Path outputDir) throws InterruptedException, IOException {
heartbeat.start();
try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"));
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(specProvider.getDomains())
) {
@ -213,9 +213,9 @@ public class CrawlerMain {
@Override
public void run() throws Exception {
Path newWarcFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
Path tempFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
Path finalWarcFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL);
Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE);
Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP);
Path finalWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL);
if (Files.exists(newWarcFile)) {
Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING);
@ -224,9 +224,8 @@ public class CrawlerMain {
Files.deleteIfExists(tempFile);
}
try (CrawledDomainWriter writer = new CrawledDomainWriter(outputDir, domain, id);
var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now
var retreiver = new CrawlerRetreiver(fetcher, domainProber, specification, warcRecorder, writer::accept);
try (var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now
var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, warcRecorder);
CrawlDataReference reference = getReference())
{
Thread.currentThread().setName("crawling:" + domain);
@ -234,39 +233,37 @@ public class CrawlerMain {
var domainLinks = anchorTagsSource.getAnchorTags(domain);
if (Files.exists(tempFile)) {
retreiver.syncAbortedRun(tempFile);
retriever.syncAbortedRun(tempFile);
Files.delete(tempFile);
}
int size = retreiver.fetch(domainLinks, reference);
int size = retriever.fetch(domainLinks, reference);
// Delete the reference crawl data if it's not the same as the new one
// (mostly a case when migrating from legacy->warc)
reference.delete();
Files.move(newWarcFile, finalWarcFile, StandardCopyOption.REPLACE_EXISTING);
workLog.setJobToFinished(domain, writer.getOutputFile().toString(), size);
workLog.setJobToFinished(domain, finalWarcFile.toString(), size);
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
logger.info("Fetched {}", domain);
} catch (Exception e) {
logger.error("Error fetching domain " + domain, e);
Files.deleteIfExists(newWarcFile);
if (tempFile != null) {
Files.deleteIfExists(tempFile);
}
Files.deleteIfExists(tempFile);
}
finally {
// We don't need to double-count these; it's also kept int he workLog
processingIds.remove(domain);
Thread.currentThread().setName("[idle]");
// FIXME: Remove this when we're done
Files.deleteIfExists(finalWarcFile);
}
}
private CrawlDataReference getReference() {
try {
var dataStream = reader.createDataStream(outputDir, domain, id);
return new CrawlDataReference(dataStream);
return new CrawlDataReference(CrawledDomainReader.createDataStream(outputDir, domain, id));
} catch (IOException e) {
logger.debug("Failed to read previous crawl data for {}", specification.domain);
return new CrawlDataReference();

View File

@ -8,6 +8,8 @@ import nu.marginalia.lsh.EasyLSH;
import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
/** A reference to a domain that has been crawled before. */
public class CrawlDataReference implements AutoCloseable {
@ -22,6 +24,15 @@ public class CrawlDataReference implements AutoCloseable {
this(SerializableCrawlDataStream.empty());
}
/** Delete the associated data from disk, if it exists */
public void delete() throws IOException {
Path filePath = data.path();
if (filePath != null) {
Files.deleteIfExists(filePath);
}
}
@Nullable
public CrawledDocument nextDocument() {
try {
@ -37,12 +48,10 @@ public class CrawlDataReference implements AutoCloseable {
return null;
}
public boolean isContentBodySame(CrawledDocument one, CrawledDocument other) {
assert one.documentBody != null;
assert other.documentBody != null;
public boolean isContentBodySame(String one, String other) {
final long contentHashOne = contentHash(one.documentBody);
final long contentHashOther = contentHash(other.documentBody);
final long contentHashOne = contentHash(one);
final long contentHashOther = contentHash(other);
return EasyLSH.hammingDistance(contentHashOne, contentHashOther) < 4;
}

View File

@ -1,6 +1,6 @@
package nu.marginalia.crawl.retreival;
import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult;
import nu.marginalia.crawling.body.HttpFetchResult;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import nu.marginalia.model.EdgeUrl;

View File

@ -7,7 +7,7 @@ import lombok.SneakyThrows;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.crawl.retreival.fetcher.ContentTags;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.crawl.retreival.fetcher.SitemapRetriever;
import nu.marginalia.crawling.body.HttpFetchResult;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.retreival.revisit.CrawlerRevisitor;
import nu.marginalia.crawl.retreival.revisit.DocumentWithReference;
@ -18,16 +18,15 @@ import nu.marginalia.ip_blocklist.UrlBlocklist;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import org.jsoup.Jsoup;
import org.jsoup.nodes.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.util.*;
import java.util.function.Consumer;
public class CrawlerRetreiver implements AutoCloseable {
@ -36,7 +35,6 @@ public class CrawlerRetreiver implements AutoCloseable {
private final HttpFetcher fetcher;
private final String domain;
private final Consumer<SerializableCrawlData> crawledDomainWriter;
private static final LinkParser linkParser = new LinkParser();
private static final Logger logger = LoggerFactory.getLogger(CrawlerRetreiver.class);
@ -56,8 +54,7 @@ public class CrawlerRetreiver implements AutoCloseable {
public CrawlerRetreiver(HttpFetcher fetcher,
DomainProber domainProber,
CrawlSpecRecord specs,
WarcRecorder warcRecorder,
Consumer<SerializableCrawlData> writer)
WarcRecorder warcRecorder)
{
this.warcRecorder = warcRecorder;
this.fetcher = fetcher;
@ -65,11 +62,8 @@ public class CrawlerRetreiver implements AutoCloseable {
domain = specs.domain;
crawledDomainWriter = writer;
crawlFrontier = new DomainCrawlFrontier(new EdgeDomain(domain), Objects.requireNonNullElse(specs.urls, List.of()), specs.crawlDepth);
crawlerRevisitor = new CrawlerRevisitor(crawlFrontier, crawledDomainWriter, this, warcRecorder);
crawlerRevisitor = new CrawlerRevisitor(crawlFrontier, this, warcRecorder);
sitemapFetcher = new SitemapFetcher(crawlFrontier, fetcher.createSitemapRetriever());
// We must always crawl the index page first, this is assumed when fingerprinting the server
@ -94,32 +88,13 @@ public class CrawlerRetreiver implements AutoCloseable {
public int fetch(DomainLinks domainLinks, CrawlDataReference oldCrawlData) {
final DomainProber.ProbeResult probeResult = domainProber.probeDomain(fetcher, domain, crawlFrontier.peek());
return switch (probeResult) {
case DomainProber.ProbeResultOk(EdgeUrl probedUrl) -> crawlDomain(oldCrawlData, probedUrl, domainLinks);
case DomainProber.ProbeResultError(CrawlerDomainStatus status, String desc) -> {
crawledDomainWriter.accept(
CrawledDomain.builder()
.crawlerStatus(status.name())
.crawlerStatusDesc(desc)
.domain(domain)
.ip(findIp(domain))
.build()
);
yield 1;
}
case DomainProber.ProbeResultRedirect(EdgeDomain redirectDomain) -> {
crawledDomainWriter.accept(
CrawledDomain.builder()
.crawlerStatus(CrawlerDomainStatus.REDIRECT.name())
.crawlerStatusDesc("Redirected to different domain")
.redirectDomain(redirectDomain.toString())
.domain(domain)
.ip(findIp(domain))
.build()
);
yield 1;
}
};
try {
return crawlDomain(oldCrawlData, probeResult, domainLinks);
}
catch (Exception ex) {
logger.error("Error crawling domain {}", domain, ex);
return 0;
}
}
public void syncAbortedRun(Path warcFile) {
@ -128,9 +103,21 @@ public class CrawlerRetreiver implements AutoCloseable {
resync.run(warcFile);
}
private int crawlDomain(CrawlDataReference oldCrawlData, EdgeUrl rootUrl, DomainLinks domainLinks) {
private int crawlDomain(CrawlDataReference oldCrawlData, DomainProber.ProbeResult probeResult, DomainLinks domainLinks) throws IOException {
String ip = findIp(domain);
EdgeUrl rootUrl;
warcRecorder.writeWarcinfoHeader(ip, new EdgeDomain(domain), probeResult);
if (!(probeResult instanceof DomainProber.ProbeResultOk ok)) {
return 1;
}
else {
rootUrl = ok.probedUrl();
}
assert !crawlFrontier.isEmpty();
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain, warcRecorder);
@ -170,7 +157,7 @@ public class CrawlerRetreiver implements AutoCloseable {
var top = crawlFrontier.takeNextUrl();
if (!robotsRules.isAllowed(top.toString())) {
crawledDomainWriter.accept(CrawledDocumentFactory.createRobotsError(top));
warcRecorder.flagAsRobotsTxtError(top);
continue;
}
@ -193,15 +180,13 @@ public class CrawlerRetreiver implements AutoCloseable {
continue;
if (fetchWriteAndSleep(top, delayTimer, DocumentWithReference.empty()).isPresent()) {
if (fetchWriteAndSleep(top, delayTimer, DocumentWithReference.empty()).isOk()) {
fetchedCount++;
}
}
ret.cookies = fetcher.getCookies();
crawledDomainWriter.accept(ret);
return fetchedCount;
}
@ -216,16 +201,16 @@ public class CrawlerRetreiver implements AutoCloseable {
var url = rootUrl.withPathAndParam("/", null);
var maybeSample = fetchUrl(url, delayTimer, DocumentWithReference.empty()).filter(sample -> sample.httpStatus == 200);
if (maybeSample.isEmpty())
var result = tryDownload(url, delayTimer, ContentTags.empty());
if (!(result instanceof HttpFetchResult.ResultOk ok))
return;
var sample = maybeSample.get();
if (sample.documentBody == null)
var optDoc = ok.parseDocument();
if (optDoc.isEmpty())
return;
// Sniff the software based on the sample document
var doc = Jsoup.parse(sample.documentBody);
var doc = optDoc.get();
crawlFrontier.setLinkFilter(linkFilterSelector.selectFilter(doc));
for (var link : doc.getElementsByTag("link")) {
@ -252,41 +237,54 @@ public class CrawlerRetreiver implements AutoCloseable {
}
}
public Optional<CrawledDocument> fetchWriteAndSleep(EdgeUrl top,
public HttpFetchResult fetchWriteAndSleep(EdgeUrl top,
CrawlDelayTimer timer,
DocumentWithReference reference) {
logger.debug("Fetching {}", top);
long startTime = System.currentTimeMillis();
var docOpt = fetchUrl(top, timer, reference);
var contentTags = reference.getContentTags();
var fetchedDoc = tryDownload(top, timer, contentTags);
if (docOpt.isPresent()) {
var doc = docOpt.get();
if (!Objects.equals(doc.recrawlState, CrawlerRevisitor.documentWasRetainedTag)
&& reference.isContentBodySame(doc))
{
// The document didn't change since the last time
doc.recrawlState = CrawlerRevisitor.documentWasSameTag;
if (fetchedDoc instanceof HttpFetchResult.ResultSame) {
var doc = reference.doc();
if (doc != null) {
warcRecorder.writeReferenceCopy(top, doc.contentType, doc.httpStatus, doc.documentBody);
fetchedDoc = new HttpFetchResult.ResultRetained(doc.url, doc.documentBody);
}
}
crawledDomainWriter.accept(doc);
try {
if (fetchedDoc instanceof HttpFetchResult.ResultOk ok) {
var docOpt = ok.parseDocument();
if (docOpt.isPresent()) {
var doc = docOpt.get();
if (doc.url != null) {
// We may have redirected to a different path
EdgeUrl.parse(doc.url).ifPresent(crawlFrontier::addVisited);
crawlFrontier.enqueueLinksFromDocument(top, doc);
crawlFrontier.addVisited(new EdgeUrl(ok.uri()));
}
}
else if (fetchedDoc instanceof HttpFetchResult.ResultRetained retained) {
var docOpt = retained.parseDocument();
if (docOpt.isPresent()) {
var doc = docOpt.get();
if ("ERROR".equals(doc.crawlerStatus) && doc.httpStatus != 404) {
errorCount++;
crawlFrontier.enqueueLinksFromDocument(top, doc);
EdgeUrl.parse(retained.url()).ifPresent(crawlFrontier::addVisited);
}
}
else if (fetchedDoc instanceof HttpFetchResult.ResultException ex) {
errorCount ++;
}
}
catch (Exception ex) {
logger.error("Error parsing document {}", top, ex);
}
timer.delay(System.currentTimeMillis() - startTime);
return docOpt;
return fetchedDoc;
}
private boolean isAllowedProtocol(String proto) {
@ -294,42 +292,11 @@ public class CrawlerRetreiver implements AutoCloseable {
|| proto.equalsIgnoreCase("https");
}
private Optional<CrawledDocument> fetchUrl(EdgeUrl top, CrawlDelayTimer timer, DocumentWithReference reference) {
try {
var contentTags = reference.getContentTags();
var fetchedDoc = tryDownload(top, timer, contentTags);
CrawledDocument doc = reference.replaceOn304(fetchedDoc);
if (doc.documentBody != null) {
doc.documentBodyHash = createHash(doc.documentBody);
var parsedDoc = Jsoup.parse(doc.documentBody);
EdgeUrl url = new EdgeUrl(doc.url);
crawlFrontier.enqueueLinksFromDocument(url, parsedDoc);
findCanonicalUrl(url, parsedDoc)
.ifPresent(canonicalLink -> doc.canonicalUrl = canonicalLink.toString());
}
return Optional.of(doc);
}
catch (Exception ex) {
logger.warn("Failed to process document {}", top);
}
return Optional.empty();
}
@SneakyThrows
private CrawledDocument tryDownload(EdgeUrl top, CrawlDelayTimer timer, ContentTags tags) {
private HttpFetchResult tryDownload(EdgeUrl top, CrawlDelayTimer timer, ContentTags tags) {
for (int i = 0; i < 2; i++) {
try {
var doc = fetcher.fetchContent(top, warcRecorder, tags);
doc.recrawlState = "NEW";
return doc;
return fetcher.fetchContent(top, warcRecorder, tags);
}
catch (RateLimitException ex) {
timer.slowDown();
@ -339,15 +306,20 @@ public class CrawlerRetreiver implements AutoCloseable {
Thread.sleep(delay);
}
}
catch (Exception ex) {
logger.warn("Failed to fetch {}", top, ex);
return new HttpFetchResult.ResultException(ex);
}
}
return CrawledDocumentFactory.createRetryError(top);
return new HttpFetchResult.ResultNone();
}
private String createHash(String documentBodyHash) {
return hashMethod.hashUnencodedChars(documentBodyHash).toString();
}
// FIXME this does not belong in the crawler
private Optional<EdgeUrl> findCanonicalUrl(EdgeUrl baseUrl, Document parsed) {
baseUrl = baseUrl.domain.toRootUrl();

View File

@ -1,8 +1,8 @@
package nu.marginalia.crawl.retreival;
import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyExtractor;
import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyResult;
import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult;
import nu.marginalia.crawling.body.DocumentBodyExtractor;
import nu.marginalia.crawling.body.DocumentBodyResult;
import nu.marginalia.crawling.body.HttpFetchResult;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.model.EdgeUrl;
import org.jsoup.Jsoup;
@ -87,7 +87,7 @@ public class CrawlerWarcResynchronizer {
}
private void revisit(WarcRevisit revisit) throws IOException {
if (!WarcRecorder.revisitURI.equals(revisit.profile())) {
if (!WarcRecorder.documentRevisitURN.equals(revisit.profile())) {
return;
}

View File

@ -1,6 +1,6 @@
package nu.marginalia.crawl.retreival.fetcher;
import nu.marginalia.crawl.retreival.logic.ContentTypeLogic;
import nu.marginalia.crawling.body.ContentTypeLogic;
import nu.marginalia.model.EdgeUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;

View File

@ -3,12 +3,11 @@ package nu.marginalia.crawl.retreival.fetcher;
import com.google.inject.ImplementedBy;
import crawlercommons.robots.SimpleRobotRules;
import nu.marginalia.crawl.retreival.RateLimitException;
import nu.marginalia.crawling.body.HttpFetchResult;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import java.nio.file.Path;
import java.util.List;
@ImplementedBy(HttpFetcherImpl.class)
@ -20,7 +19,7 @@ public interface HttpFetcher {
FetchResult probeDomain(EdgeUrl url);
CrawledDocument fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags) throws RateLimitException;
HttpFetchResult fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags) throws RateLimitException;
SimpleRobotRules fetchRobotRules(EdgeDomain domain, WarcRecorder recorder);

View File

@ -8,30 +8,26 @@ import lombok.SneakyThrows;
import nu.marginalia.crawl.retreival.Cookies;
import nu.marginalia.crawl.retreival.RateLimitException;
import nu.marginalia.crawl.retreival.fetcher.ContentTypeProber.ContentTypeProbeResult;
import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyExtractor;
import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyResult;
import nu.marginalia.crawl.retreival.fetcher.socket.*;
import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult;
import static nu.marginalia.crawl.retreival.CrawledDocumentFactory.*;
import nu.marginalia.crawl.retreival.fetcher.socket.FastTerminatingSocketFactory;
import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor;
import nu.marginalia.crawl.retreival.fetcher.socket.NoSecuritySSL;
import nu.marginalia.crawling.body.HttpFetchResult;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import nu.marginalia.crawling.body.ContentTypeLogic;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.crawl.retreival.logic.ContentTypeLogic;
import okhttp3.*;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.net.ssl.SSLException;
import javax.net.ssl.X509TrustManager;
import java.io.EOFException;
import java.io.IOException;
import java.net.*;
import java.nio.charset.IllegalCharsetNameException;
import java.time.LocalDateTime;
import java.util.*;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
@ -141,9 +137,9 @@ public class HttpFetcherImpl implements HttpFetcher {
@Override
@SneakyThrows
public CrawledDocument fetchContent(EdgeUrl url,
WarcRecorder warcRecorder,
ContentTags contentTags)
public HttpFetchResult fetchContent(EdgeUrl url,
WarcRecorder warcRecorder,
ContentTags contentTags)
throws RateLimitException
{
@ -152,23 +148,21 @@ public class HttpFetcherImpl implements HttpFetcher {
if (contentTags.isEmpty() && contentTypeLogic.isUrlLikeBinary(url))
{
ContentTypeProbeResult probeResult = contentTypeProber.probeContentType(url);
switch (probeResult) {
case ContentTypeProbeResult.Ok(EdgeUrl redirectUrl) -> {
url = redirectUrl;
}
case ContentTypeProbeResult.BadContentType (String contentType, int statusCode) -> {
return createErrorResponse(url, contentType, statusCode,
CrawlerDocumentStatus.BAD_CONTENT_TYPE,
contentType
);
}
case ContentTypeProbeResult.Timeout timeout -> {
return createTimeoutErrorRsp(url);
}
case ContentTypeProbeResult.Exception ex -> {
return createErrorFromException(url, ex.ex());
}
};
if (probeResult instanceof ContentTypeProbeResult.Ok ok) {
url = ok.resolvedUrl();
}
else if (probeResult instanceof ContentTypeProbeResult.BadContentType badContentType) {
warcRecorder.flagAsFailedContentTypeProbe(url, badContentType.contentType(), badContentType.statusCode());
return new HttpFetchResult.ResultNone();
}
else if (probeResult instanceof ContentTypeProbeResult.BadContentType.Timeout timeout) {
warcRecorder.flagAsTimeout(url);
return new HttpFetchResult.ResultNone();
}
else if (probeResult instanceof ContentTypeProbeResult.Exception exception) {
warcRecorder.flagAsError(url, exception.ex());
return new HttpFetchResult.ResultNone();
}
}
var getBuilder = new Request.Builder().get();
@ -181,78 +175,20 @@ public class HttpFetcherImpl implements HttpFetcher {
HttpFetchResult result = warcRecorder.fetch(client, getBuilder.build());
if (result instanceof HttpFetchResult.ResultError err) {
return createErrorFromException(url, err.ex());
}
else if (result instanceof HttpFetchResult.ResultOk ok) {
try {
return extractBody(userAgent, url, ok);
if (result instanceof HttpFetchResult.ResultOk ok) {
if (ok.statusCode() == 429) {
String retryAfter = Objects.requireNonNullElse(ok.header("Retry-After"), "1000");
throw new RateLimitException(retryAfter);
}
catch (Exception ex) {
return createErrorFromException(url, ex);
if (ok.statusCode() == 304) {
return new HttpFetchResult.ResultSame();
}
if (ok.statusCode() == 200) {
return ok;
}
}
else {
throw new IllegalStateException(STR."Unknown result type \{result.getClass()}");
}
}
private CrawledDocument createErrorFromException(EdgeUrl url, Exception exception) throws RateLimitException {
return switch (exception) {
case RateLimitException rle -> throw rle;
case SocketTimeoutException ex -> createTimeoutErrorRsp(url);
case UnknownHostException ex -> createUnknownHostError(url);
case SocketException ex -> createHardErrorRsp(url, ex);
case ProtocolException ex -> createHardErrorRsp(url, ex);
case IllegalCharsetNameException ex -> createHardErrorRsp(url, ex);
case SSLException ex -> createHardErrorRsp(url, ex);
case EOFException ex -> createHardErrorRsp(url, ex);
default -> {
logger.error("Error during fetching", exception);
yield createHardErrorRsp(url, exception);
}
};
}
public static CrawledDocument extractBody(String userAgent, EdgeUrl url, HttpFetchResult.ResultOk rsp) throws IOException, RateLimitException {
var responseUrl = new EdgeUrl(rsp.uri());
if (!Objects.equals(responseUrl.domain, url.domain)) {
return createRedirectResponse(url, rsp, responseUrl);
}
if (rsp.statusCode() == 429) {
String retryAfter = Objects.requireNonNullElse(rsp.header("Retry-After"), "1000");
throw new RateLimitException(retryAfter);
}
if (!isXRobotsTagsPermitted(rsp.allHeaders("X-Robots-Tag"), userAgent)) {
return CrawledDocument.builder()
.crawlerStatus(CrawlerDocumentStatus.ROBOTS_TXT.name())
.crawlerStatusDesc("X-Robots-Tag")
.url(responseUrl.toString())
.httpStatus(-1)
.timestamp(LocalDateTime.now().toString())
.headers(rsp.headers().toString())
.build();
}
return switch(DocumentBodyExtractor.extractBody(rsp)) {
case DocumentBodyResult.Error(CrawlerDocumentStatus status, String why) ->
createErrorResponse(url, rsp, status, why);
case DocumentBodyResult.Ok(String contentType, String body) ->
CrawledDocument.builder()
.crawlerStatus(CrawlerDocumentStatus.OK.name())
.headers(rsp.headers().toString())
.contentType(contentType)
.timestamp(LocalDateTime.now().toString())
.httpStatus(rsp.statusCode())
.url(responseUrl.toString())
.documentBody(body)
.build();
};
return new HttpFetchResult.ResultNone();
}
/** Check X-Robots-Tag header tag to see if we are allowed to index this page.
@ -318,17 +254,31 @@ public class HttpFetcherImpl implements HttpFetcher {
private Optional<SimpleRobotRules> fetchRobotsForProto(String proto, WarcRecorder recorder, EdgeDomain domain) {
try {
var url = new EdgeUrl(proto, domain, null, "/robots.txt", null);
return Optional.of(parseRobotsTxt(fetchContent(url, recorder, ContentTags.empty())));
var getBuilder = new Request.Builder().get();
getBuilder.url(url.toString())
.addHeader("Accept-Encoding", "gzip")
.addHeader("User-agent", userAgent);
HttpFetchResult result = recorder.fetch(client, getBuilder.build());
if (result instanceof HttpFetchResult.ResultOk ok) {
return Optional.of(parseRobotsTxt(ok));
}
else {
return Optional.empty();
}
}
catch (Exception ex) {
return Optional.empty();
}
}
private SimpleRobotRules parseRobotsTxt(CrawledDocument doc) {
return robotsParser.parseContent(doc.url,
doc.documentBody.getBytes(),
doc.contentType,
private SimpleRobotRules parseRobotsTxt(HttpFetchResult.ResultOk ok) {
return robotsParser.parseContent(ok.uri().toString(),
ok.bytesRaw(),
ok.header("Content-Type"),
userAgent);
}

View File

@ -1,44 +0,0 @@
package nu.marginalia.crawl.retreival.fetcher.body;
import nu.marginalia.contenttype.ContentTypeParser;
import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult;
import nu.marginalia.crawl.retreival.logic.ContentTypeLogic;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import org.apache.commons.io.input.BOMInputStream;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
public class DocumentBodyExtractor {
private static ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
public static DocumentBodyResult extractBody(HttpFetchResult.ResultOk rsp) throws IOException {
var byteStream = rsp.getInputStream();
if ("gzip".equals(rsp.header("Content-Encoding"))) {
byteStream = new GZIPInputStream(byteStream);
}
byteStream = new BOMInputStream(byteStream);
var contentTypeHeader = rsp.header("Content-Type");
if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) {
return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, "");
}
byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder
var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data);
if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) {
return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, "");
}
if ("Shift_JIS".equalsIgnoreCase(contentType.charset())) {
return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CHARSET, "");
}
return new DocumentBodyResult.Ok(contentType.contentType(), DocumentBodyToString.getStringData(contentType, data));
}
}

View File

@ -1,8 +0,0 @@
package nu.marginalia.crawl.retreival.fetcher.body;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
public sealed interface DocumentBodyResult {
record Ok(String contentType, String body) implements DocumentBodyResult { }
record Error(CrawlerDocumentStatus status, String why) implements DocumentBodyResult { }
}

View File

@ -1,6 +1,9 @@
package nu.marginalia.crawl.retreival.fetcher.warc;
import nu.marginalia.crawl.retreival.DomainProber;
import nu.marginalia.crawling.body.HttpFetchResult;
import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
@ -8,7 +11,6 @@ import org.netpreserve.jwarc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
@ -18,9 +20,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
/** Based on JWarc's fetch method, APL 2.0 license
* <p></p>
@ -29,7 +29,12 @@ import java.util.Map;
* be reconstructed.
*/
public class WarcRecorder implements AutoCloseable {
public static final URI revisitURI = URI.create("urn:marginalia:revisit");
public static final URI documentRevisitURN = URI.create("urn:marginalia/data/doc/revisit");
public static final URI documentRobotsTxtSkippedURN = URI.create("urn:marginalia/meta/doc/robots-txt-skipped");
public static final URI documentBadContentTypeURN = URI.create("urn:marginalia/meta/doc/content-type-failed-probe");
public static final URI documentProbeTimeout = URI.create("urn:marginalia/meta/doc/timeout-probe");
public static final URI documentUnspecifiedError = URI.create("urn:marginalia/meta/doc/error");
private static final int MAX_TIME = 30_000;
private static final int MAX_SIZE = 1024 * 1024 * 10;
@ -37,10 +42,14 @@ public class WarcRecorder implements AutoCloseable {
private final Path warcFile;
private static final Logger logger = LoggerFactory.getLogger(WarcRecorder.class);
private ThreadLocal<byte[]> bufferThreadLocal = ThreadLocal.withInitial(() -> new byte[MAX_SIZE]);
private final ThreadLocal<byte[]> bufferThreadLocal = ThreadLocal.withInitial(() -> new byte[MAX_SIZE]);
private boolean temporaryFile = false;
// Affix a version string in case we need to change the format in the future
// in some way
private final String warcRecorderVersion = "1.0";
/**
* Create a new WarcRecorder that will write to the given file
*
@ -48,7 +57,7 @@ public class WarcRecorder implements AutoCloseable {
*/
public WarcRecorder(Path warcFile) throws IOException {
this.warcFile = warcFile;
this.writer = new WarcWriter(this.warcFile);
this.writer = new WarcWriter(warcFile);
}
/**
@ -170,7 +179,7 @@ public class WarcRecorder implements AutoCloseable {
}
catch (Exception ex) {
logger.warn("Failed to fetch URL {}", uri, ex);
return new HttpFetchResult.ResultError(ex);
return new HttpFetchResult.ResultException(ex);
}
}
@ -178,55 +187,141 @@ public class WarcRecorder implements AutoCloseable {
writer.write(item);
}
/**
* Flag the given URL as skipped by the crawler, so that it will not be retried.
* Which URLs were skipped is still important when resynchronizing on the WARC file,
* so that the crawler can avoid re-fetching them.
*
* @param url The URL to flag
* @param headers
* @param documentBody
*/
public void flagAsSkipped(EdgeUrl url, String headers, int statusCode, String documentBody) {
private void saveOldResponse(EdgeUrl url, String contentType, int statusCode, String documentBody) {
try {
WarcDigestBuilder responseDigestBuilder = new WarcDigestBuilder();
WarcDigestBuilder payloadDigestBuilder = new WarcDigestBuilder();
String header = WarcProtocolReconstructor.getResponseHeader(headers, statusCode);
byte[] bytes = documentBody.getBytes();
String fakeHeaders = STR."""
Content-Type: \{contentType}
Content-Length: \{bytes.length}
Content-Encoding: UTF-8
""";
String header = WarcProtocolReconstructor.getResponseHeader(fakeHeaders, statusCode);
ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer();
responseDataBuffer.put(header);
responseDigestBuilder.update(header);
try (var inputStream = new ByteArrayInputStream(documentBody.getBytes())) {
int remainingLength;
while ((remainingLength = responseDataBuffer.remaining()) > 0) {
int startPos = responseDataBuffer.pos();
responseDigestBuilder.update(bytes, bytes.length);
payloadDigestBuilder.update(bytes, bytes.length);
responseDataBuffer.put(bytes, 0, bytes.length);
int n = responseDataBuffer.readFrom(inputStream, remainingLength);
if (n < 0)
break;
responseDataBuffer.updateDigest(responseDigestBuilder, startPos, n);
responseDataBuffer.updateDigest(payloadDigestBuilder, startPos, n);
}
}
WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), revisitURI)
WarcXResponseReference reference = new WarcXResponseReference.Builder(url.asURI())
.blockDigest(responseDigestBuilder.build())
.payloadDigest(payloadDigestBuilder.build())
.date(Instant.now())
.body(MediaType.HTTP_RESPONSE, responseDataBuffer.copyBytes())
.build();
revisit.http(); // force HTTP header to be parsed before body is consumed so that caller can use it
reference.http(); // force HTTP header to be parsed before body is consumed so that caller can use it
writer.write(revisit);
writer.write(reference);
} catch (URISyntaxException | IOException | NoSuchAlgorithmException e) {
throw new RuntimeException(e);
}
}
/**
* Flag the given URL as skipped by the crawler, so that it will not be retried.
* Which URLs were skipped is still important when resynchronizing on the WARC file,
* so that the crawler can avoid re-fetching them.
*/
public void flagAsSkipped(EdgeUrl url, String contentType, int statusCode, String documentBody) {
saveOldResponse(url, contentType, statusCode, documentBody);
}
/**
* Write a reference copy of the given document data. This is used when the crawler provides
* an E-Tag or Last-Modified header, and the server responds with a 304 Not Modified. In this
* scenario we want to record the data as it was in the previous crawl, but not re-fetch it.
*/
public void writeReferenceCopy(EdgeUrl url, String contentType, int statusCode, String documentBody) {
saveOldResponse(url, contentType, statusCode, documentBody);
}
public void writeWarcinfoHeader(String ip, EdgeDomain domain, DomainProber.ProbeResult result) throws IOException {
Map<String, List<String>> fields = new HashMap<>();
fields.put("ip", List.of(ip));
fields.put("software", List.of(STR."search.marginalia.nu/\{warcRecorderVersion}"));
fields.put("domain", List.of(domain.toString()));
switch (result) {
case DomainProber.ProbeResultRedirect redirectDomain:
fields.put("X-WARC-Probe-Status", List.of(STR."REDIRECT;\{redirectDomain.domain()}"));
break;
case DomainProber.ProbeResultError error:
fields.put("X-WARC-Probe-Status", List.of(STR."\{error.status().toString()};\{error.desc()}"));
break;
case DomainProber.ProbeResultOk ok:
fields.put("X-WARC-Probe-Status", List.of("OK"));
break;
}
var warcinfo = new Warcinfo.Builder()
.date(Instant.now())
.fields(fields)
.recordId(UUID.randomUUID())
.build();
writer.write(warcinfo);
}
public void flagAsRobotsTxtError(EdgeUrl top) {
try {
WarcRevisit revisit = new WarcRevisit.Builder(top.asURI(), documentRobotsTxtSkippedURN)
.date(Instant.now())
.build();
writer.write(revisit);
} catch (URISyntaxException | IOException e) {
throw new RuntimeException(e);
}
}
public void flagAsFailedContentTypeProbe(EdgeUrl url, String contentType, int status) {
try {
WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), documentBadContentTypeURN)
.date(Instant.now())
.addHeader("Rejected-Content-Type", contentType)
.addHeader("Http-Status", Integer.toString(status))
.build();
writer.write(revisit);
} catch (URISyntaxException | IOException e) {
throw new RuntimeException(e);
}
}
public void flagAsError(EdgeUrl url, Exception ex) {
try {
WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), documentUnspecifiedError)
.date(Instant.now())
.addHeader("Exception", ex.getClass().getSimpleName())
.addHeader("ErrorMessage", Objects.requireNonNullElse(ex.getMessage(), ""))
.build();
writer.write(revisit);
} catch (URISyntaxException | IOException e) {
throw new RuntimeException(e);
}
}
public void flagAsTimeout(EdgeUrl url) {
try {
WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), documentProbeTimeout)
.date(Instant.now())
.build();
writer.write(revisit);
} catch (URISyntaxException | IOException e) {
throw new RuntimeException(e);
}
}
private class ResponseDataBuffer {
private final byte[] data;

View File

@ -5,15 +5,11 @@ import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.CrawlDelayTimer;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.DomainCrawlFrontier;
import nu.marginalia.crawl.retreival.CrawledDocumentFactory;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.model.EdgeUrl;
import org.jsoup.Jsoup;
import java.util.function.Consumer;
/** This class encapsulates the logic for re-visiting a domain that has already been crawled.
* We may use information from the previous crawl to inform the next crawl, specifically the
* E-Tag and Last-Modified headers.
@ -27,16 +23,13 @@ public class CrawlerRevisitor {
private final DomainCrawlFrontier crawlFrontier;
private final Consumer<SerializableCrawlData> crawledDomainWriter;
private final CrawlerRetreiver crawlerRetreiver;
private final WarcRecorder warcRecorder;
public CrawlerRevisitor(DomainCrawlFrontier crawlFrontier,
Consumer<SerializableCrawlData> crawledDomainWriter,
CrawlerRetreiver crawlerRetreiver,
WarcRecorder warcRecorder) {
this.crawlFrontier = crawlFrontier;
this.crawledDomainWriter = crawledDomainWriter;
this.crawlerRetreiver = crawlerRetreiver;
this.warcRecorder = warcRecorder;
}
@ -69,7 +62,7 @@ public class CrawlerRevisitor {
if (doc.httpStatus != 200) continue;
if (!robotsRules.isAllowed(url.toString())) {
crawledDomainWriter.accept(CrawledDocumentFactory.createRobotsError(url));
warcRecorder.flagAsRobotsTxtError(url);
continue;
}
if (!crawlFrontier.filterLink(url))
@ -87,7 +80,6 @@ public class CrawlerRevisitor {
// fashion to make sure we eventually catch changes over time
// and ensure we discover new links
crawledDomainWriter.accept(doc);
crawlFrontier.addVisited(url);
// Hoover up any links from the document
@ -97,7 +89,7 @@ public class CrawlerRevisitor {
}
// Add a WARC record so we don't repeat this
warcRecorder.flagAsSkipped(url, doc.headers, doc.httpStatus, doc.documentBody);
warcRecorder.flagAsSkipped(url, doc.contentType, doc.httpStatus, doc.documentBody);
continue;
}
@ -107,15 +99,14 @@ public class CrawlerRevisitor {
// providing etag and last-modified headers, so we can recycle the
// document if it hasn't changed without actually downloading it
var fetchedDocOpt = crawlerRetreiver.fetchWriteAndSleep(url,
delayTimer,
new DocumentWithReference(doc, oldCrawlData));
if (fetchedDocOpt.isEmpty()) continue;
var reference = new DocumentWithReference(doc, oldCrawlData);
var result = crawlerRetreiver.fetchWriteAndSleep(url, delayTimer, reference);
if (documentWasRetainedTag.equals(fetchedDocOpt.get().recrawlState)) retained ++;
else if (documentWasSameTag.equals(fetchedDocOpt.get().recrawlState)) retained ++;
if (reference.isSame(result)) {
retained++;
}
recrawled ++;
recrawled++;
}
return recrawled;

View File

@ -1,12 +1,15 @@
package nu.marginalia.crawl.retreival.revisit;
import lombok.SneakyThrows;
import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.fetcher.ContentTags;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.body.DocumentBodyExtractor;
import nu.marginalia.crawling.body.HttpFetchResult;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.model.EdgeUrl;
import javax.annotation.Nullable;
import java.time.LocalDateTime;
public record DocumentWithReference(
@Nullable CrawledDocument doc,
@ -18,17 +21,28 @@ public record DocumentWithReference(
return emptyInstance;
}
public boolean isContentBodySame(CrawledDocument newDoc) {
/** Returns true if the provided document is the same as the reference document,
* or if the result was retained via HTTP 304.
*/
public boolean isSame(HttpFetchResult result) {
if (result instanceof HttpFetchResult.ResultSame)
return true;
if (result instanceof HttpFetchResult.ResultRetained)
return true;
if (!(result instanceof HttpFetchResult.ResultOk resultOk))
return false;
if (reference == null)
return false;
if (doc == null)
return false;
if (doc.documentBody == null)
return false;
if (newDoc.documentBody == null)
return false;
return reference.isContentBodySame(doc, newDoc);
return DocumentBodyExtractor.extractBody(resultOk)
.map((contentType, body) -> reference.isContentBodySame(doc.documentBody, body))
.orElse(false);
}
public ContentTags getContentTags() {
@ -60,23 +74,4 @@ public record DocumentWithReference(
return doc == null || reference == null;
}
/**
* If the provided document has HTTP status 304, and the reference document is provided,
* return the reference document; otherwise return the provided document.
*/
public CrawledDocument replaceOn304(CrawledDocument fetchedDoc) {
if (doc == null)
return fetchedDoc;
// HTTP status 304 is NOT MODIFIED, which means the document is the same as it was when
// we fetched it last time. We can recycle the reference document.
if (fetchedDoc.httpStatus != 304)
return fetchedDoc;
var ret = doc;
ret.recrawlState = CrawlerRevisitor.documentWasRetainedTag;
ret.timestamp = LocalDateTime.now().toString();
return ret;
}
}

View File

@ -8,9 +8,7 @@ import okhttp3.Request;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.netpreserve.jwarc.WarcReader;
import org.netpreserve.jwarc.WarcRequest;
import org.netpreserve.jwarc.WarcResponse;
import org.netpreserve.jwarc.*;
import java.io.IOException;
import java.net.URISyntaxException;
@ -22,6 +20,7 @@ import java.util.Map;
import java.util.zip.GZIPInputStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
class WarcRecorderTest {
Path fileName;
@ -33,7 +32,7 @@ class WarcRecorderTest {
.addNetworkInterceptor(new IpInterceptingNetworkInterceptor())
.build();
fileName = Files.createTempFile("test", ".warc.gz");
fileName = Files.createTempFile("test", ".warc");
client = new WarcRecorder(fileName);
}
@ -73,10 +72,7 @@ class WarcRecorderTest {
try (var recorder = new WarcRecorder(fileName)) {
recorder.flagAsSkipped(new EdgeUrl("https://www.marginalia.nu/"),
"""
Content-type: text/html
X-Cookies: 1
""",
"text/html",
200,
"<?doctype html><html><body>test</body></html>");
}
@ -95,5 +91,27 @@ class WarcRecorderTest {
new GZIPInputStream(Files.newInputStream(fileName)).transferTo(System.out);
}
@Test
public void testSaveImport() throws URISyntaxException, IOException {
try (var recorder = new WarcRecorder(fileName)) {
recorder.flagAsSkipped(new EdgeUrl("https://www.marginalia.nu/"),
"text/html",
200,
"<?doctype html><html><body>test</body></html>");
}
try (var reader = new WarcReader(fileName)) {
WarcXResponseReference.register(reader);
for (var record : reader) {
System.out.println(record.type());
System.out.println(record.getClass().getSimpleName());
if (record instanceof WarcXResponseReference rsp) {
assertEquals("https://www.marginalia.nu/", rsp.target());
}
}
}
}
}

View File

@ -4,8 +4,10 @@ import lombok.SneakyThrows;
import nu.marginalia.crawl.retreival.RateLimitException;
import nu.marginalia.crawl.retreival.fetcher.ContentTags;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawling.body.DocumentBodyExtractor;
import nu.marginalia.crawling.body.DocumentBodyResult;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.retreival.logic.ContentTypeLogic;
import nu.marginalia.crawling.body.ContentTypeLogic;
import nu.marginalia.model.EdgeUrl;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@ -33,10 +35,11 @@ class HttpFetcherTest {
void fetchUTF8() throws URISyntaxException, RateLimitException, IOException {
var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler");
try (var recorder = new WarcRecorder()) {
var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), recorder, ContentTags.empty());
System.out.println(str.contentType);
var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), recorder, ContentTags.empty());
if (DocumentBodyExtractor.extractBody(result) instanceof DocumentBodyResult.Ok bodyOk) {
System.out.println(bodyOk.contentType());
}
}
}
@Test
@ -44,8 +47,10 @@ class HttpFetcherTest {
var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler");
try (var recorder = new WarcRecorder()) {
var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), recorder, ContentTags.empty());
System.out.println(str.contentType);
var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), recorder, ContentTags.empty());
if (DocumentBodyExtractor.extractBody(result) instanceof DocumentBodyResult.Ok bodyOk) {
System.out.println(bodyOk.contentType());
}
}
}
}

View File

@ -5,6 +5,7 @@ import lombok.SneakyThrows;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.DomainProber;
import nu.marginalia.crawl.retreival.fetcher.*;
import nu.marginalia.crawling.body.HttpFetchResult;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
@ -13,6 +14,7 @@ import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import nu.marginalia.test.CommonTestData;
import okhttp3.Headers;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@ -21,12 +23,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
import java.util.*;
public class CrawlerMockFetcherTest {
@ -65,9 +62,9 @@ public class CrawlerMockFetcherTest {
}
void crawl(CrawlSpecRecord spec, Consumer<SerializableCrawlData> consumer) throws IOException {
void crawl(CrawlSpecRecord spec) throws IOException {
try (var recorder = new WarcRecorder()) {
new CrawlerRetreiver(fetcherMock, new DomainProber(d -> true), spec, recorder, consumer)
new CrawlerRetreiver(fetcherMock, new DomainProber(d -> true), spec, recorder)
.fetch();
}
}
@ -80,9 +77,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://startrek.website/c/startrek"), "mock-crawl-data/lemmy/c_startrek.html");
registerUrlClasspathData(new EdgeUrl("https://startrek.website/post/108995"), "mock-crawl-data/lemmy/108995.html");
crawl(new CrawlSpecRecord("startrek.website", 10, new ArrayList<>()), out::add);
out.forEach(System.out::println);
crawl(new CrawlSpecRecord("startrek.website", 10, new ArrayList<>()));
}
@Test
@ -91,9 +86,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://en.wikipedia.org/"), "mock-crawl-data/mediawiki/index.html");
crawl(new CrawlSpecRecord("en.wikipedia.org", 10, new ArrayList<>()), out::add);
out.forEach(System.out::println);
crawl(new CrawlSpecRecord("en.wikipedia.org", 10, new ArrayList<>()));
}
@Test
@ -104,9 +97,7 @@ public class CrawlerMockFetcherTest {
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/telegram-channel-to-idle-on/3501"), "mock-crawl-data/discourse/telegram.html");
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/combined-mode-but-grid/4489"), "mock-crawl-data/discourse/grid.html");
crawl(new CrawlSpecRecord("community.tt-rss.org", 10, new ArrayList<>()), out::add);
out.forEach(System.out::println);
crawl(new CrawlSpecRecord("community.tt-rss.org", 10, new ArrayList<>()));
}
class MockFetcher implements HttpFetcher {
@ -126,21 +117,23 @@ public class CrawlerMockFetcherTest {
return new FetchResult(FetchResultState.OK, url);
}
@SneakyThrows
@Override
public CrawledDocument fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags) {
public HttpFetchResult fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags) {
logger.info("Fetching {}", url);
if (mockData.containsKey(url)) {
return mockData.get(url);
}
else {
return CrawledDocument.builder()
.crawlId("1")
.url(url.toString())
.contentType("text/html")
.httpStatus(404)
.crawlerStatus(CrawlerDocumentStatus.ERROR.name())
.build();
byte[] bodyBytes = mockData.get(url).documentBody.getBytes();
return new HttpFetchResult.ResultOk(
url.asURI(),
200,
new Headers.Builder().build(),
bodyBytes,
0,
bodyBytes.length
);
}
return new HttpFetchResult.ResultNone();
}
@Override

View File

@ -16,15 +16,14 @@ import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import org.junit.jupiter.api.*;
import org.netpreserve.jwarc.WarcReader;
import org.netpreserve.jwarc.WarcRequest;
import org.netpreserve.jwarc.WarcResponse;
import org.netpreserve.jwarc.*;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.*;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -33,6 +32,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
class CrawlerRetreiverTest {
private HttpFetcher httpFetcher;
Path tempFile;
Path tempFile2;
@BeforeEach
public void setUp() {
httpFetcher = new HttpFetcherImpl("search.marginalia.nu; testing a bit :D");
@ -45,6 +46,15 @@ class CrawlerRetreiverTest {
System.setProperty("http.agent", WmsaHome.getUserAgent().uaString());
}
@AfterEach
public void tearDown() throws IOException {
if (tempFile != null) {
Files.deleteIfExists(tempFile);
}
if (tempFile2 != null) {
Files.deleteIfExists(tempFile2);
}
}
@Test
public void testWarcOutput() throws IOException {
var specs = CrawlSpecRecord
@ -57,10 +67,8 @@ class CrawlerRetreiverTest {
try {
tempFile = Files.createTempFile("crawling-process", "warc");
List<SerializableCrawlData> data = new ArrayList<>();
try (var recorder = new WarcRecorder(tempFile)) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, data::add).fetch();
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch();
} catch (IOException ex) {
Assertions.fail(ex);
}
@ -93,7 +101,7 @@ class CrawlerRetreiverTest {
}
}
@Test
public void testWithKnownDomains() {
public void testWithKnownDomains() throws IOException {
var specs = CrawlSpecRecord
.builder()
.crawlDepth(5)
@ -103,15 +111,30 @@ class CrawlerRetreiverTest {
List<SerializableCrawlData> data = new ArrayList<>();
try (var recorder = new WarcRecorder()) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, data::add).fetch();
tempFile = Files.createTempFile("crawling-process", ".warc");
try (var recorder = new WarcRecorder(tempFile)) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch();
}
catch (IOException ex) {
Assertions.fail(ex);
}
try (var stream = CrawledDomainReader.createDataStream(tempFile)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) {
data.add(doc);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
var fetchedUrls =
data.stream().filter(CrawledDocument.class::isInstance)
data.stream()
.peek(System.out::println)
.filter(CrawledDocument.class::isInstance)
.map(CrawledDocument.class::cast)
.map(doc -> doc.url)
.collect(Collectors.toSet());
@ -126,7 +149,7 @@ class CrawlerRetreiverTest {
}
@Test
public void testEmptySet() {
public void testEmptySet() throws IOException {
var specs = CrawlSpecRecord
.builder()
@ -135,15 +158,30 @@ class CrawlerRetreiverTest {
.urls(List.of())
.build();
List<SerializableCrawlData> data = new ArrayList<>();
try (var recorder = new WarcRecorder()) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, data::add).fetch();
tempFile = Files.createTempFile("crawling-process", ".warc");
try (var recorder = new WarcRecorder(tempFile)) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch();
}
catch (IOException ex) {
Assertions.fail(ex);
}
try (var stream = CrawledDomainReader.createDataStream(tempFile)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDocument doc) {
data.add(doc);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
data.stream().filter(CrawledDocument.class::isInstance)
.map(CrawledDocument.class::cast)
.forEach(doc -> System.out.println(doc.url + "\t" + doc.crawlerStatus + "\t" + doc.httpStatus));
@ -174,43 +212,70 @@ class CrawlerRetreiverTest {
.build();
Path out = Files.createTempDirectory("crawling-process");
var writer = new CrawledDomainWriter(out, specs.domain, "idid");
tempFile = Files.createTempFile("crawling-process", ".warc.gz");
tempFile2 = Files.createTempFile("crawling-process", ".warc.gz");
Map<Class<? extends SerializableCrawlData>, List<SerializableCrawlData>> data = new HashMap<>();
try (var recorder = new WarcRecorder()) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, d -> {
data.computeIfAbsent(d.getClass(), k->new ArrayList<>()).add(d);
if (d instanceof CrawledDocument doc) {
System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus);
if (Math.random() > 0.5) {
doc.headers = "";
}
}
writer.accept(d);
}).fetch();
try (var recorder = new WarcRecorder(tempFile)) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch();
}
catch (IOException ex) {
Assertions.fail(ex);
}
writer.close();
var reader = new CrawledDomainReader();
var stream = reader.createDataStream(out, specs.domain, "idid");
try (var stream = CrawledDomainReader.createDataStream(tempFile)) {
while (stream.hasNext()) {
var doc = stream.next();
data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc);
}
} catch (Exception e) {
throw new RuntimeException(e);
}
var stream = CrawledDomainReader.createDataStream(tempFile);
CrawledDomain domain = (CrawledDomain) data.get(CrawledDomain.class).get(0);
domain.doc = data.get(CrawledDocument.class).stream().map(CrawledDocument.class::cast).collect(Collectors.toList());
try (var recorder = new WarcRecorder()) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, d -> {
if (d instanceof CrawledDocument doc) {
System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus);
}
}).fetch(new DomainLinks(), new CrawlDataReference(stream));
try (var recorder = new WarcRecorder(tempFile2)) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(new DomainLinks(),
new CrawlDataReference(stream));
}
catch (IOException ex) {
Assertions.fail(ex);
}
new GZIPInputStream(Files.newInputStream(tempFile2)).transferTo(System.out);
try (var reader = new WarcReader(tempFile2)) {
WarcXResponseReference.register(reader);
reader.forEach(record -> {
if (record instanceof WarcResponse rsp) {
try {
System.out.println(rsp.type() + ":" + rsp.target() + "/" + rsp.http().status());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
if (record instanceof WarcMetadata rsp) {
System.out.println("meta:" + rsp.target());
}
});
}
try (var ds = CrawledDomainReader.createDataStream(tempFile2)) {
while (ds.hasNext()) {
var doc = ds.next();
if (doc instanceof CrawledDomain dr) {
System.out.println(dr.domain + "/" + dr.crawlerStatus);
}
else if (doc instanceof CrawledDocument dc) {
System.out.println(dc.url + "/" + dc.crawlerStatus + "/" + dc.httpStatus);
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -63,8 +63,6 @@ public class ExportAtagsActor extends RecordActorPrototype {
Path inputDir = storageService.getStorage(crawlId).asPath();
var reader = new CrawledDomainReader();
try (var bw = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(Files.newOutputStream(tmpFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING))));
)
{
@ -78,7 +76,7 @@ public class ExportAtagsActor extends RecordActorPrototype {
}
Path crawlDataPath = inputDir.resolve(item.relPath());
try (var stream = reader.createDataStream(crawlDataPath)) {
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
exportLinks(tagWriter, stream);
}
catch (Exception ex) {

View File

@ -29,13 +29,11 @@ public class CrawlDataUnfcker {
return;
}
var reader = new CrawledDomainReader();
try (var wl = new WorkLog(output.resolve("crawler.log"))) {
for (var inputItem : WorkLog.iterable(input.resolve("crawler.log"))) {
Path inputPath = input.resolve(inputItem.relPath());
var domainMaybe = readDomain(reader, inputPath).map(CrawledDomain::getDomain);
var domainMaybe = readDomain(inputPath).map(CrawledDomain::getDomain);
if (domainMaybe.isEmpty())
continue;
var domain = domainMaybe.get();
@ -43,7 +41,7 @@ public class CrawlDataUnfcker {
// Generate conformant ID
String newId = Integer.toHexString(domain.hashCode());
var outputPath = CrawlerOutputFile.createOutputPath(output, newId, domain);
var outputPath = CrawlerOutputFile.createLegacyOutputPath(output, newId, domain);
var outputFileName = outputPath.toFile().getName();
System.out.println(inputPath + " -> " + outputPath);
@ -56,13 +54,13 @@ public class CrawlDataUnfcker {
}
}
static Optional<CrawledDomain> readDomain(CrawledDomainReader reader, Path file) {
static Optional<CrawledDomain> readDomain(Path file) {
if (!Files.exists(file)) {
System.out.println("Missing file " + file);
return Optional.empty();
}
try (var stream = reader.createDataStream(file)) {
try (var stream = CrawledDomainReader.createDataStream(file)) {
while (stream.hasNext()) {
if (stream.next() instanceof CrawledDomain domain) {
return Optional.of(domain);

View File

@ -50,10 +50,9 @@ public class ExperimentRunnerMain {
experiment.args(Arrays.copyOfRange(args, 2, args.length));
Path basePath = Path.of(args[0]);
var reader = new CrawledDomainReader();
for (var item : WorkLog.iterable(basePath.resolve("crawler.log"))) {
Path crawlDataPath = basePath.resolve(item.relPath());
try (var stream = reader.createDataStream(crawlDataPath)) {
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
experiment.process(stream);
}
catch (Exception ex) {

View File

@ -5,12 +5,12 @@ import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
public abstract class LegacyExperiment extends Experiment {
public abstract boolean process(CrawledDomain domain);
@Override
public boolean process(SerializableCrawlDataStream dataStream) throws IOException {
List<CrawledDocument> documentList = new ArrayList<>();

View File

@ -155,7 +155,7 @@ dependencyResolutionManagement {
library('duckdb', 'org.duckdb', 'duckdb_jdbc').version('0.9.1')
library('okhttp3','com.squareup.okhttp3','okhttp').version('4.11.0')
library('jwarc', 'org.netpreserve', 'jwarc').version('0.28.4')
library('jwarc', 'org.netpreserve', 'jwarc').version('0.28.5')
library('httpcomponents.core','org.apache.httpcomponents','httpcore').version('4.4.15')
library('httpcomponents.client','org.apache.httpcomponents','httpclient').version('4.5.13')