From 440e097d7879f01a6f54b8489e3892129b3f1970 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Wed, 13 Dec 2023 15:33:42 +0100 Subject: [PATCH] (crawler) WIP integration of WARC files into the crawler and converter process. This commit is in a pretty rough state. It refactors the crawler fairly significantly to offer better separation of concerns. It replaces the zstd compressed json files used to store crawl data with WARC files entirely, and the converter is modified to be able to consume this data. This works, -ish. There appears to be some bug relating to reading robots.txt, and the X-Robots-Tag header is no longer processed either. A problem is that the WARC files are a bit too large. It will probably be likely to introduce a new format to store the crawl data long term, something like parquet; and use WARCs for intermediate storage to enable the crawler to be restarted without needing a recrawl. --- .../crawling-model/build.gradle | 5 + .../crawling/body}/ContentTypeLogic.java | 2 +- .../crawling/body/DocumentBodyExtractor.java | 60 +++++++ .../crawling/body/DocumentBodyResult.java | 23 +++ .../crawling/body}/HttpFetchResult.java | 56 +++++- .../crawling/io/CrawledDomainReader.java | 154 +++------------- .../crawling/io/CrawledDomainWriter.java | 2 +- .../crawling/io/CrawlerOutputFile.java | 37 +++- .../io/SerializableCrawlDataStream.java | 6 +- ...ileReadingSerializableCrawlDataStream.java | 70 ++++++++ ...arcReadingSerializableCrawlDataStream.java | 156 ++++++++++++++++ .../jwarc/WarcXResponseReference.java | 44 +++++ .../src/main/java/plan/CrawlPlan.java | 18 +- ...CrawlingThenConvertingIntegrationTest.java | 2 +- .../java/nu/marginalia/crawl/CrawlerMain.java | 33 ++-- .../crawl/retreival/CrawlDataReference.java | 19 +- .../retreival/CrawledDocumentFactory.java | 2 +- .../crawl/retreival/CrawlerRetreiver.java | 168 ++++++++---------- .../retreival/CrawlerWarcResynchronizer.java | 8 +- .../retreival/fetcher/ContentTypeProber.java | 2 +- .../crawl/retreival/fetcher/HttpFetcher.java | 5 +- .../retreival/fetcher/HttpFetcherImpl.java | 168 ++++++------------ .../fetcher/body/DocumentBodyExtractor.java | 44 ----- .../fetcher/body/DocumentBodyResult.java | 8 - .../retreival/fetcher/warc/WarcRecorder.java | 165 +++++++++++++---- .../retreival/revisit/CrawlerRevisitor.java | 25 +-- .../revisit/DocumentWithReference.java | 45 +++-- .../retreival/fetcher/WarcRecorderTest.java | 34 +++- .../marginalia/crawling/HttpFetcherTest.java | 17 +- .../retreival/CrawlerMockFetcherTest.java | 49 +++-- .../retreival/CrawlerRetreiverTest.java | 139 +++++++++++---- .../actor/task/ExportAtagsActor.java | 4 +- .../nu/marginalia/tools/CrawlDataUnfcker.java | 10 +- .../tools/ExperimentRunnerMain.java | 3 +- .../nu/marginalia/tools/LegacyExperiment.java | 2 +- settings.gradle | 2 +- 36 files changed, 966 insertions(+), 621 deletions(-) rename code/{processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/logic => process-models/crawling-model/src/main/java/nu/marginalia/crawling/body}/ContentTypeLogic.java (97%) create mode 100644 code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java create mode 100644 code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyResult.java rename code/{processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc => process-models/crawling-model/src/main/java/nu/marginalia/crawling/body}/HttpFetchResult.java (63%) create mode 100644 code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/LegacyFileReadingSerializableCrawlDataStream.java create mode 100644 code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/WarcReadingSerializableCrawlDataStream.java create mode 100644 code/process-models/crawling-model/src/main/java/org/netpreserve/jwarc/WarcXResponseReference.java delete mode 100644 code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyExtractor.java delete mode 100644 code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyResult.java diff --git a/code/process-models/crawling-model/build.gradle b/code/process-models/crawling-model/build.gradle index ebbea855..f1f77a70 100644 --- a/code/process-models/crawling-model/build.gradle +++ b/code/process-models/crawling-model/build.gradle @@ -20,13 +20,18 @@ dependencies { implementation project(':code:api:index-api') implementation project(':code:common:service-discovery') implementation project(':code:common:service-client') + implementation project(':code:features-crawl:content-type') implementation project(':code:libraries:language-processing') implementation libs.bundles.slf4j implementation libs.notnull + implementation libs.jwarc implementation libs.gson + implementation libs.commons.io + implementation libs.okhttp3 + implementation libs.jsoup implementation libs.snakeyaml implementation libs.zstd diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/logic/ContentTypeLogic.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/ContentTypeLogic.java similarity index 97% rename from code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/logic/ContentTypeLogic.java rename to code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/ContentTypeLogic.java index c5860913..d7dfa6d1 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/logic/ContentTypeLogic.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/ContentTypeLogic.java @@ -1,4 +1,4 @@ -package nu.marginalia.crawl.retreival.logic; +package nu.marginalia.crawling.body; import nu.marginalia.model.EdgeUrl; diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java new file mode 100644 index 00000000..7bb548e5 --- /dev/null +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java @@ -0,0 +1,60 @@ +package nu.marginalia.crawling.body; + +import nu.marginalia.contenttype.ContentTypeParser; +import nu.marginalia.contenttype.DocumentBodyToString; +import nu.marginalia.crawling.model.CrawlerDocumentStatus; +import org.apache.commons.io.input.BOMInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.zip.GZIPInputStream; + +public class DocumentBodyExtractor { + private static ContentTypeLogic contentTypeLogic = new ContentTypeLogic(); + + private static final Logger logger = LoggerFactory.getLogger(DocumentBodyExtractor.class); + + public static DocumentBodyResult extractBody(HttpFetchResult result) { + if (result instanceof HttpFetchResult.ResultOk fetchOk) { + return extractBody(fetchOk); + } + else { + return new DocumentBodyResult.Error(CrawlerDocumentStatus.ERROR, ""); + } + } + + public static DocumentBodyResult extractBody(HttpFetchResult.ResultOk rsp) { + try { + var byteStream = rsp.getInputStream(); + + if ("gzip".equals(rsp.header("Content-Encoding"))) { + byteStream = new GZIPInputStream(byteStream); + } + byteStream = new BOMInputStream(byteStream); + + var contentTypeHeader = rsp.header("Content-Type"); + if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) { + return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); + } + + byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder + + var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data); + if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) { + return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); + } + + if ("Shift_JIS".equalsIgnoreCase(contentType.charset())) { + return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CHARSET, ""); + } + + return new DocumentBodyResult.Ok(contentType.contentType(), DocumentBodyToString.getStringData(contentType, data)); + } + catch (IOException ex) { + logger.error("Failed to extract body", ex); + return new DocumentBodyResult.Error(CrawlerDocumentStatus.ERROR, ""); + } + } + +} diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyResult.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyResult.java new file mode 100644 index 00000000..1959f844 --- /dev/null +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyResult.java @@ -0,0 +1,23 @@ +package nu.marginalia.crawling.body; + +import nu.marginalia.crawling.model.CrawlerDocumentStatus; + +import java.util.Optional; +import java.util.function.BiFunction; + +public sealed interface DocumentBodyResult { + record Ok(String contentType, String body) implements DocumentBodyResult { + @Override + public Optional map(BiFunction fun) { + return Optional.of(fun.apply(contentType, body)); + } + } + record Error(CrawlerDocumentStatus status, String why) implements DocumentBodyResult { + @Override + public Optional map(BiFunction fun) { + return Optional.empty(); + } + } + + Optional map(BiFunction fun); +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/HttpFetchResult.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/HttpFetchResult.java similarity index 63% rename from code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/HttpFetchResult.java rename to code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/HttpFetchResult.java index ae9673b1..9790e3da 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/HttpFetchResult.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/HttpFetchResult.java @@ -1,17 +1,23 @@ -package nu.marginalia.crawl.retreival.fetcher.warc; +package nu.marginalia.crawling.body; import okhttp3.Headers; +import org.jsoup.Jsoup; import org.netpreserve.jwarc.MessageHeaders; import org.netpreserve.jwarc.WarcResponse; import org.netpreserve.jwarc.WarcRevisit; +import org.jsoup.nodes.Document; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.util.List; +import java.util.Optional; public sealed interface HttpFetchResult { + + boolean isOk(); + static ResultOk importWarc(WarcResponse response) throws IOException { var http = response.http(); try (var body = http.body()) { @@ -27,6 +33,7 @@ public sealed interface HttpFetchResult { ); } } + static ResultOk importWarc(WarcRevisit revisit) throws IOException { var http = revisit.http(); try (var body = http.body()) { @@ -41,7 +48,11 @@ public sealed interface HttpFetchResult { bytes.length ); } + finally { + revisit.body().consume(); + } } + record ResultOk(URI uri, int statusCode, Headers headers, @@ -50,6 +61,10 @@ public sealed interface HttpFetchResult { int bytesLength ) implements HttpFetchResult { + public boolean isOk() { + return statusCode >= 200 && statusCode < 300; + } + public ResultOk(URI uri, int statusCode, MessageHeaders headers, @@ -73,6 +88,14 @@ public sealed interface HttpFetchResult { return new ByteArrayInputStream(bytesRaw, bytesStart, bytesLength); } + public Optional parseDocument() throws IOException { + return switch(DocumentBodyExtractor.extractBody(this)) { + case DocumentBodyResult.Ok ok when "text/html".equalsIgnoreCase(ok.contentType()) + -> Optional.of(Jsoup.parse(ok.body())); + default -> Optional.empty(); + }; + } + public String header(String name) { return headers.get(name); } @@ -82,5 +105,34 @@ public sealed interface HttpFetchResult { }; - record ResultError(Exception ex) implements HttpFetchResult { }; + record ResultRetained(String url, String body) implements HttpFetchResult { + + public boolean isOk() { + return true; + } + + public Optional parseDocument() { + try { + return Optional.of(Jsoup.parse(body)); + } + catch (Exception ex) { + return Optional.empty(); + } + } + }; + record ResultException(Exception ex) implements HttpFetchResult { + public boolean isOk() { + return false; + } + }; + record ResultSame() implements HttpFetchResult { + public boolean isOk() { + return false; + } + }; + record ResultNone() implements HttpFetchResult { + public boolean isOk() { + return false; + } + }; } diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java index b7021ace..0da0b790 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java @@ -1,156 +1,44 @@ package nu.marginalia.crawling.io; -import com.github.luben.zstd.RecyclingBufferPool; -import com.github.luben.zstd.ZstdInputStream; import com.google.gson.Gson; -import nu.marginalia.crawling.model.CrawledDocument; -import nu.marginalia.crawling.model.CrawledDomain; -import nu.marginalia.crawling.model.SerializableCrawlData; +import nu.marginalia.crawling.io.format.LegacyFileReadingSerializableCrawlDataStream; +import nu.marginalia.crawling.io.format.WarcReadingSerializableCrawlDataStream; import nu.marginalia.model.gson.GsonFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.*; +import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.TimeUnit; public class CrawledDomainReader { - private final Gson gson = GsonFactory.get(); - private final Logger logger = LoggerFactory.getLogger(getClass()); - private final ForkJoinPool pool = new ForkJoinPool(6); + private static final Gson gson = GsonFactory.get(); public CrawledDomainReader() { } /** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */ - public SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException { - return new FileReadingSerializableCrawlDataStream(gson, fullPath.toFile()); + public static SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException { + String fileName = fullPath.getFileName().toString(); + if (fileName.endsWith(".zstd")) { + return new LegacyFileReadingSerializableCrawlDataStream(gson, fullPath.toFile()); + } + else if (fileName.endsWith(".warc") || fileName.endsWith(".warc.gz")) { + return new WarcReadingSerializableCrawlDataStream(fullPath); + } + else { + throw new IllegalArgumentException("Unknown file type: " + fullPath); + } } /** An iterator-like access to domain data. This must be closed otherwise it will leak off-heap memory! */ - public SerializableCrawlDataStream createDataStream(Path basePath, String domain, String id) throws IOException { - return createDataStream(CrawlerOutputFile.getOutputFile(basePath, id, domain)); - } + public static SerializableCrawlDataStream createDataStream(Path basePath, String domain, String id) throws IOException { + Path warcPath = CrawlerOutputFile.getWarcPath(basePath, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL); - /** Read the entirety of the domain data into memory. This uses a lot of RAM */ - public CrawledDomain read(Path path) throws IOException { - DomainDataAssembler domainData = new DomainDataAssembler(); - - try (var br = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(path.toFile()), RecyclingBufferPool.INSTANCE)))) { - String line; - while ((line = br.readLine()) != null) { - if (line.startsWith("//")) { - String identifier = line; - String data = br.readLine(); - - pool.execute(() -> deserializeLine(identifier, data, domainData)); - } - } + if (Files.exists(warcPath)) { + return createDataStream(warcPath); } - - while (!pool.awaitQuiescence(1, TimeUnit.SECONDS)); - - return domainData.assemble(); - } - - - private void deserializeLine(String identifier, String data, DomainDataAssembler assembler) { - if (null == data) { - return; - } - if (identifier.equals(CrawledDomain.SERIAL_IDENTIFIER)) { - assembler.acceptDomain(gson.fromJson(data, CrawledDomain.class)); - } else if (identifier.equals(CrawledDocument.SERIAL_IDENTIFIER)) { - assembler.acceptDoc(gson.fromJson(data, CrawledDocument.class)); + else { + return createDataStream(CrawlerOutputFile.getLegacyOutputFile(basePath, id, domain)); } } - public Optional readOptionally(Path path) { - try { - return Optional.of(read(path)); - } - catch (Exception ex) { - return Optional.empty(); - } - } - - private static class DomainDataAssembler { - private CrawledDomain domainPrototype; - private final List docs = new ArrayList<>(); - - public synchronized void acceptDomain(CrawledDomain domain) { - this.domainPrototype = domain; - } - - public synchronized void acceptDoc(CrawledDocument doc) { - docs.add(doc); - } - - public synchronized CrawledDomain assemble() { - if (!docs.isEmpty()) { - if (domainPrototype.doc == null) - domainPrototype.doc = new ArrayList<>(); - - domainPrototype.doc.addAll(docs); - } - return domainPrototype; - } - } - - private static class FileReadingSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream { - private final Gson gson; - private final BufferedReader bufferedReader; - private SerializableCrawlData next = null; - - public FileReadingSerializableCrawlDataStream(Gson gson, File file) throws IOException { - this.gson = gson; - bufferedReader = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file), RecyclingBufferPool.INSTANCE))); - } - - @Override - public SerializableCrawlData next() throws IOException { - if (hasNext()) { - var ret = next; - next = null; - return ret; - } - throw new IllegalStateException("No more data"); - } - - @Override - public boolean hasNext() throws IOException { - if (next != null) - return true; - - String identifier = bufferedReader.readLine(); - if (identifier == null) { - bufferedReader.close(); - return false; - } - String data = bufferedReader.readLine(); - if (data == null) { - bufferedReader.close(); - return false; - } - - if (identifier.equals(CrawledDomain.SERIAL_IDENTIFIER)) { - next = gson.fromJson(data, CrawledDomain.class); - } else if (identifier.equals(CrawledDocument.SERIAL_IDENTIFIER)) { - next = gson.fromJson(data, CrawledDocument.class); - } - else { - throw new IllegalStateException("Unknown identifier: " + identifier); - } - return true; - } - - @Override - public void close() throws Exception { - bufferedReader.close(); - } - } } diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java index 0e278f09..f21715ee 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java @@ -55,7 +55,7 @@ public class CrawledDomainWriter implements AutoCloseable { } private Path getOutputFile(String id, String name) throws IOException { - return CrawlerOutputFile.createOutputPath(outputDir, id, name); + return CrawlerOutputFile.createLegacyOutputPath(outputDir, id, name); } @Override diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java index 67e8738c..907eb081 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java @@ -9,7 +9,11 @@ import java.nio.file.Path; public class CrawlerOutputFile { /** Return the Path to a file for the given id and name */ - public static Path getOutputFile(Path base, String id, String name) { + public static Path getLegacyOutputFile(Path base, String id, String name) { + if (id.length() < 4) { + id = Strings.repeat("0", 4 - id.length()) + id; + } + String first = id.substring(0, 2); String second = id.substring(2, 4); @@ -19,7 +23,7 @@ public class CrawlerOutputFile { /** Return the Path to a file for the given id and name, creating the prerequisite * directory structure as necessary. */ - public static Path createOutputPath(Path base, String id, String name) throws IOException { + public static Path createLegacyOutputPath(Path base, String id, String name) throws IOException { if (id.length() < 4) { id = Strings.repeat("0", 4 - id.length()) + id; } @@ -49,20 +53,37 @@ public class CrawlerOutputFile { } - public static Path createWarcFile(Path baseDir, String id, String name, WarcFileVersion version) { + public static Path createWarcPath(Path basePath, String id, String domain, WarcFileVersion version) throws IOException { if (id.length() < 4) { id = Strings.repeat("0", 4 - id.length()) + id; } - String fileName = STR."\{id}-\{filesystemSafeName(name)}.zstd\{version.suffix}"; + String first = id.substring(0, 2); + String second = id.substring(2, 4); - return baseDir.resolve(fileName); + Path destDir = basePath.resolve(first).resolve(second); + if (!Files.exists(destDir)) { + Files.createDirectories(destDir); + } + return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}-\{version.suffix}.warc.gz"); + } + + public static Path getWarcPath(Path basePath, String id, String domain, WarcFileVersion version) { + if (id.length() < 4) { + id = Strings.repeat("0", 4 - id.length()) + id; + } + + String first = id.substring(0, 2); + String second = id.substring(2, 4); + + Path destDir = basePath.resolve(first).resolve(second); + return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}.warc\{version.suffix}"); } public enum WarcFileVersion { - LIVE(".open"), - TEMP(".tmp"), - FINAL(""); + LIVE("open"), + TEMP("tmp"), + FINAL("final"); public final String suffix; diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/SerializableCrawlDataStream.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/SerializableCrawlDataStream.java index 3aecc0fc..9598d002 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/SerializableCrawlDataStream.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/SerializableCrawlDataStream.java @@ -1,11 +1,13 @@ package nu.marginalia.crawling.io; import nu.marginalia.crawling.model.SerializableCrawlData; +import org.jetbrains.annotations.Nullable; import java.io.IOException; +import java.nio.file.Path; import java.util.Iterator; -/** Closable iterator over serialized crawl data +/** Closable iterator exceptional over serialized crawl data * The data may appear in any order, and the iterator must be closed. * * @see CrawledDomainReader @@ -17,6 +19,8 @@ public interface SerializableCrawlDataStream extends AutoCloseable { boolean hasNext() throws IOException; + @Nullable + default Path path() { return null; } // Dummy iterator over nothing static SerializableCrawlDataStream empty() { diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/LegacyFileReadingSerializableCrawlDataStream.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/LegacyFileReadingSerializableCrawlDataStream.java new file mode 100644 index 00000000..efff17f3 --- /dev/null +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/LegacyFileReadingSerializableCrawlDataStream.java @@ -0,0 +1,70 @@ +package nu.marginalia.crawling.io.format; + +import com.github.luben.zstd.RecyclingBufferPool; +import com.github.luben.zstd.ZstdInputStream; +import com.google.gson.Gson; +import nu.marginalia.crawling.io.SerializableCrawlDataStream; +import nu.marginalia.crawling.model.CrawledDocument; +import nu.marginalia.crawling.model.CrawledDomain; +import nu.marginalia.crawling.model.SerializableCrawlData; + +import java.io.*; +import java.nio.file.Path; + +public class LegacyFileReadingSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream { + private final Gson gson; + private final BufferedReader bufferedReader; + private SerializableCrawlData next = null; + + private final Path path; + public LegacyFileReadingSerializableCrawlDataStream(Gson gson, File file) throws IOException { + this.gson = gson; + bufferedReader = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file), RecyclingBufferPool.INSTANCE))); + path = file.toPath(); + } + + @Override + public Path path() { + return path; + } + @Override + public SerializableCrawlData next() throws IOException { + if (hasNext()) { + var ret = next; + next = null; + return ret; + } + throw new IllegalStateException("No more data"); + } + + @Override + public boolean hasNext() throws IOException { + if (next != null) + return true; + + String identifier = bufferedReader.readLine(); + if (identifier == null) { + bufferedReader.close(); + return false; + } + String data = bufferedReader.readLine(); + if (data == null) { + bufferedReader.close(); + return false; + } + + if (identifier.equals(CrawledDomain.SERIAL_IDENTIFIER)) { + next = gson.fromJson(data, CrawledDomain.class); + } else if (identifier.equals(CrawledDocument.SERIAL_IDENTIFIER)) { + next = gson.fromJson(data, CrawledDocument.class); + } else { + throw new IllegalStateException("Unknown identifier: " + identifier); + } + return true; + } + + @Override + public void close() throws Exception { + bufferedReader.close(); + } +} diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/WarcReadingSerializableCrawlDataStream.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/WarcReadingSerializableCrawlDataStream.java new file mode 100644 index 00000000..9d8d1a63 --- /dev/null +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/WarcReadingSerializableCrawlDataStream.java @@ -0,0 +1,156 @@ +package nu.marginalia.crawling.io.format; + +import lombok.SneakyThrows; +import nu.marginalia.crawling.body.DocumentBodyExtractor; +import nu.marginalia.crawling.body.DocumentBodyResult; +import nu.marginalia.crawling.body.HttpFetchResult; +import nu.marginalia.crawling.io.SerializableCrawlDataStream; +import nu.marginalia.crawling.model.CrawledDocument; +import nu.marginalia.crawling.model.CrawledDomain; +import nu.marginalia.crawling.model.SerializableCrawlData; +import org.netpreserve.jwarc.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.StringJoiner; + +public class WarcReadingSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream { + private static final Logger logger = LoggerFactory.getLogger(WarcReadingSerializableCrawlDataStream.class); + + private final WarcReader reader; + private final Iterator backingIterator; + private SerializableCrawlData next = null; + private final Path path; + + public WarcReadingSerializableCrawlDataStream(Path file) throws IOException { + path = file; + reader = new WarcReader(file); + WarcXResponseReference.register(reader); + + backingIterator = reader.iterator(); + } + + @Override + public Path path() { + return path; + } + + @Override + @SneakyThrows + public boolean hasNext() { + while (backingIterator.hasNext() && next == null) { + var nextRecord = backingIterator.next(); + if (nextRecord instanceof WarcResponse response) { // this also includes WarcXResponseReference + convertResponse(response); + } + else if (nextRecord instanceof Warcinfo warcinfo) { + convertWarcinfo(warcinfo); + } + else if (nextRecord instanceof WarcMetadata metadata) { + convertMetadata(metadata); + } + } + return next != null; + } + + private void convertMetadata(WarcMetadata metadata) { + // Nothing to do here for now + } + + private void convertWarcinfo(Warcinfo warcinfo) throws IOException { + var headers = warcinfo.fields(); + String probeStatus = headers.first("X-WARC-Probe-Status").orElse(""); + String[] parts = probeStatus.split(" ", 2); + + + String domain = headers.first("domain").orElseThrow(() -> new IllegalStateException("Missing domain header")); + String status = parts[0]; + String statusReason = parts.length > 1 ? parts[1] : ""; + String ip = headers.first("ip").orElse(""); + + String redirectDomain = null; + if ("REDIRECT".equalsIgnoreCase(status)) { + redirectDomain = statusReason; + } + + // TODO: Fix cookies info somehow + next = new CrawledDomain(domain, redirectDomain, status, statusReason, ip, List.of(), List.of()); + } + + private void convertResponse(WarcResponse response) throws IOException { + var http = response.http(); + + if (http.status() != 200) { + return; + } + CrawledDocument document; + + var parsedBody = DocumentBodyExtractor.extractBody(HttpFetchResult.importWarc(response)); + if (parsedBody instanceof DocumentBodyResult.Error error) { + next = new CrawledDocument( + "", + response.targetURI().toString(), + http.contentType().raw(), + response.date().toString(), + http.status(), + error.status().toString(), + error.why(), + headers(http.headers()), + null, + response.payloadDigest().map(WarcDigest::base64).orElse(""), + "", + "", + ""); + } else if (parsedBody instanceof DocumentBodyResult.Ok ok) { + next = new CrawledDocument( + "", + response.targetURI().toString(), + ok.contentType(), + response.date().toString(), + http.status(), + "OK", + "", + headers(http.headers()), + ok.body(), + response.payloadDigest().map(WarcDigest::base64).orElse(""), + "", + "", + ""); + } else { + // unreachable + throw new IllegalStateException("Unknown body type: " + parsedBody); + } + } + + public String headers(MessageHeaders headers) { + StringJoiner ret = new StringJoiner("\n"); + for (var header : headers.map().entrySet()) { + for (var value : header.getValue()) { + ret.add(STR."\{header.getKey()}: \{value}"); + } + } + return ret.toString(); + } + + public void close() throws IOException { + reader.close(); + } + + @Override + public SerializableCrawlData next() throws IOException { + if (!hasNext()) + throw new NoSuchElementException(); + try { + return next; + } + finally { + next = null; + } + } + +} diff --git a/code/process-models/crawling-model/src/main/java/org/netpreserve/jwarc/WarcXResponseReference.java b/code/process-models/crawling-model/src/main/java/org/netpreserve/jwarc/WarcXResponseReference.java new file mode 100644 index 00000000..7e02d936 --- /dev/null +++ b/code/process-models/crawling-model/src/main/java/org/netpreserve/jwarc/WarcXResponseReference.java @@ -0,0 +1,44 @@ +package org.netpreserve.jwarc; + +import java.io.IOException; +import java.net.URI; + +/** This defines a non-standard extension to WARC for storing old HTTP responses, + * essentially a 'revisit' with a full body, which is not something that is + * expected by the jwarc parser, and goes against the semantics of the revisit + * records a fair bit. + *

+ * An x-response-reference record is a response record with a full body, where + * the data is a reconstructed HTTP response from a previous crawl. + */ +public class WarcXResponseReference extends WarcResponse { + private static final String TYPE_NAME = "x-response-reference"; + + WarcXResponseReference(MessageVersion version, MessageHeaders headers, MessageBody body) { + super(version, headers, body); + } + + public static void register(WarcReader reader) { + reader.registerType(TYPE_NAME, WarcXResponseReference::new); + } + + public static class Builder extends AbstractBuilder { + public Builder(URI targetURI) { + this(targetURI.toString()); + } + + public Builder(String targetURI) { + super(TYPE_NAME); + setHeader("WARC-Target-URI", targetURI); + } + + public Builder body(HttpResponse httpResponse) throws IOException { + return body(MediaType.HTTP_RESPONSE, httpResponse); + } + + @Override + public WarcXResponseReference build() { + return build(WarcXResponseReference::new); + } + } +} diff --git a/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java b/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java index 718dea06..cbb88772 100644 --- a/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java +++ b/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java @@ -74,23 +74,13 @@ public class CrawlPlan { return count; } + @Deprecated public Iterable domainsIterable() { - final CrawledDomainReader reader = new CrawledDomainReader(); - - return WorkLog.iterableMap(crawl.getLogFile(), - entry -> { - var path = getCrawledFilePath(entry.path()); - if (!Files.exists(path)) { - logger.warn("File not found: {}", path); - return Optional.empty(); - } - return reader.readOptionally(path); - }); + // This is no longer supported + throw new UnsupportedOperationException(); } public Iterable crawlDataIterable(Predicate idPredicate) { - final CrawledDomainReader reader = new CrawledDomainReader(); - return WorkLog.iterableMap(crawl.getLogFile(), entry -> { if (!idPredicate.test(entry.id())) { @@ -105,7 +95,7 @@ public class CrawlPlan { } try { - return Optional.of(reader.createDataStream(path)); + return Optional.of(CrawledDomainReader.createDataStream(path)); } catch (IOException ex) { return Optional.empty(); diff --git a/code/processes/converting-process/src/test/java/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java b/code/processes/converting-process/src/test/java/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java index 5b5deddc..67b4f7b6 100644 --- a/code/processes/converting-process/src/test/java/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java +++ b/code/processes/converting-process/src/test/java/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java @@ -79,7 +79,7 @@ public class CrawlingThenConvertingIntegrationTest { List data = new ArrayList<>(); try (var recorder = new WarcRecorder()) { - new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, data::add).fetch(); + new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(); } CrawledDomain domain = data.stream().filter(CrawledDomain.class::isInstance).map(CrawledDomain.class::cast).findFirst().get(); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java index a5d78a1f..f5b5a10e 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java @@ -62,7 +62,6 @@ public class CrawlerMain { private final SimpleBlockingThreadPool pool; private final Map processingIds = new ConcurrentHashMap<>(); - private final CrawledDomainReader reader = new CrawledDomainReader(); final AbortMonitor abortMonitor = AbortMonitor.getInstance(); @@ -142,6 +141,7 @@ public class CrawlerMain { public void run(CrawlSpecProvider specProvider, Path outputDir) throws InterruptedException, IOException { heartbeat.start(); + try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log")); AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(specProvider.getDomains()) ) { @@ -213,9 +213,9 @@ public class CrawlerMain { @Override public void run() throws Exception { - Path newWarcFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE); - Path tempFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP); - Path finalWarcFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL); + Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE); + Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP); + Path finalWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL); if (Files.exists(newWarcFile)) { Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING); @@ -224,9 +224,8 @@ public class CrawlerMain { Files.deleteIfExists(tempFile); } - try (CrawledDomainWriter writer = new CrawledDomainWriter(outputDir, domain, id); - var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now - var retreiver = new CrawlerRetreiver(fetcher, domainProber, specification, warcRecorder, writer::accept); + try (var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now + var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, warcRecorder); CrawlDataReference reference = getReference()) { Thread.currentThread().setName("crawling:" + domain); @@ -234,39 +233,37 @@ public class CrawlerMain { var domainLinks = anchorTagsSource.getAnchorTags(domain); if (Files.exists(tempFile)) { - retreiver.syncAbortedRun(tempFile); + retriever.syncAbortedRun(tempFile); Files.delete(tempFile); } - int size = retreiver.fetch(domainLinks, reference); + int size = retriever.fetch(domainLinks, reference); + + // Delete the reference crawl data if it's not the same as the new one + // (mostly a case when migrating from legacy->warc) + reference.delete(); Files.move(newWarcFile, finalWarcFile, StandardCopyOption.REPLACE_EXISTING); - workLog.setJobToFinished(domain, writer.getOutputFile().toString(), size); + workLog.setJobToFinished(domain, finalWarcFile.toString(), size); heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks); logger.info("Fetched {}", domain); } catch (Exception e) { logger.error("Error fetching domain " + domain, e); Files.deleteIfExists(newWarcFile); - if (tempFile != null) { - Files.deleteIfExists(tempFile); - } + Files.deleteIfExists(tempFile); } finally { // We don't need to double-count these; it's also kept int he workLog processingIds.remove(domain); Thread.currentThread().setName("[idle]"); - - // FIXME: Remove this when we're done - Files.deleteIfExists(finalWarcFile); } } private CrawlDataReference getReference() { try { - var dataStream = reader.createDataStream(outputDir, domain, id); - return new CrawlDataReference(dataStream); + return new CrawlDataReference(CrawledDomainReader.createDataStream(outputDir, domain, id)); } catch (IOException e) { logger.debug("Failed to read previous crawl data for {}", specification.domain); return new CrawlDataReference(); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java index 985bfc39..9088ebb4 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java @@ -8,6 +8,8 @@ import nu.marginalia.lsh.EasyLSH; import javax.annotation.Nullable; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; /** A reference to a domain that has been crawled before. */ public class CrawlDataReference implements AutoCloseable { @@ -22,6 +24,15 @@ public class CrawlDataReference implements AutoCloseable { this(SerializableCrawlDataStream.empty()); } + /** Delete the associated data from disk, if it exists */ + public void delete() throws IOException { + Path filePath = data.path(); + + if (filePath != null) { + Files.deleteIfExists(filePath); + } + } + @Nullable public CrawledDocument nextDocument() { try { @@ -37,12 +48,10 @@ public class CrawlDataReference implements AutoCloseable { return null; } - public boolean isContentBodySame(CrawledDocument one, CrawledDocument other) { - assert one.documentBody != null; - assert other.documentBody != null; + public boolean isContentBodySame(String one, String other) { - final long contentHashOne = contentHash(one.documentBody); - final long contentHashOther = contentHash(other.documentBody); + final long contentHashOne = contentHash(one); + final long contentHashOther = contentHash(other); return EasyLSH.hammingDistance(contentHashOne, contentHashOther) < 4; } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawledDocumentFactory.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawledDocumentFactory.java index b3ab9ee5..37f84d58 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawledDocumentFactory.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawledDocumentFactory.java @@ -1,6 +1,6 @@ package nu.marginalia.crawl.retreival; -import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawling.model.CrawledDocument; import nu.marginalia.crawling.model.CrawlerDocumentStatus; import nu.marginalia.model.EdgeUrl; diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java index 30054008..514243ee 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java @@ -7,7 +7,7 @@ import lombok.SneakyThrows; import nu.marginalia.atags.model.DomainLinks; import nu.marginalia.crawl.retreival.fetcher.ContentTags; import nu.marginalia.crawl.retreival.fetcher.HttpFetcher; -import nu.marginalia.crawl.retreival.fetcher.SitemapRetriever; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.retreival.revisit.CrawlerRevisitor; import nu.marginalia.crawl.retreival.revisit.DocumentWithReference; @@ -18,16 +18,15 @@ import nu.marginalia.ip_blocklist.UrlBlocklist; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.crawlspec.CrawlSpecRecord; -import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.file.Path; import java.util.*; -import java.util.function.Consumer; public class CrawlerRetreiver implements AutoCloseable { @@ -36,7 +35,6 @@ public class CrawlerRetreiver implements AutoCloseable { private final HttpFetcher fetcher; private final String domain; - private final Consumer crawledDomainWriter; private static final LinkParser linkParser = new LinkParser(); private static final Logger logger = LoggerFactory.getLogger(CrawlerRetreiver.class); @@ -56,8 +54,7 @@ public class CrawlerRetreiver implements AutoCloseable { public CrawlerRetreiver(HttpFetcher fetcher, DomainProber domainProber, CrawlSpecRecord specs, - WarcRecorder warcRecorder, - Consumer writer) + WarcRecorder warcRecorder) { this.warcRecorder = warcRecorder; this.fetcher = fetcher; @@ -65,11 +62,8 @@ public class CrawlerRetreiver implements AutoCloseable { domain = specs.domain; - crawledDomainWriter = writer; - - crawlFrontier = new DomainCrawlFrontier(new EdgeDomain(domain), Objects.requireNonNullElse(specs.urls, List.of()), specs.crawlDepth); - crawlerRevisitor = new CrawlerRevisitor(crawlFrontier, crawledDomainWriter, this, warcRecorder); + crawlerRevisitor = new CrawlerRevisitor(crawlFrontier, this, warcRecorder); sitemapFetcher = new SitemapFetcher(crawlFrontier, fetcher.createSitemapRetriever()); // We must always crawl the index page first, this is assumed when fingerprinting the server @@ -94,32 +88,13 @@ public class CrawlerRetreiver implements AutoCloseable { public int fetch(DomainLinks domainLinks, CrawlDataReference oldCrawlData) { final DomainProber.ProbeResult probeResult = domainProber.probeDomain(fetcher, domain, crawlFrontier.peek()); - return switch (probeResult) { - case DomainProber.ProbeResultOk(EdgeUrl probedUrl) -> crawlDomain(oldCrawlData, probedUrl, domainLinks); - case DomainProber.ProbeResultError(CrawlerDomainStatus status, String desc) -> { - crawledDomainWriter.accept( - CrawledDomain.builder() - .crawlerStatus(status.name()) - .crawlerStatusDesc(desc) - .domain(domain) - .ip(findIp(domain)) - .build() - ); - yield 1; - } - case DomainProber.ProbeResultRedirect(EdgeDomain redirectDomain) -> { - crawledDomainWriter.accept( - CrawledDomain.builder() - .crawlerStatus(CrawlerDomainStatus.REDIRECT.name()) - .crawlerStatusDesc("Redirected to different domain") - .redirectDomain(redirectDomain.toString()) - .domain(domain) - .ip(findIp(domain)) - .build() - ); - yield 1; - } - }; + try { + return crawlDomain(oldCrawlData, probeResult, domainLinks); + } + catch (Exception ex) { + logger.error("Error crawling domain {}", domain, ex); + return 0; + } } public void syncAbortedRun(Path warcFile) { @@ -128,9 +103,21 @@ public class CrawlerRetreiver implements AutoCloseable { resync.run(warcFile); } - private int crawlDomain(CrawlDataReference oldCrawlData, EdgeUrl rootUrl, DomainLinks domainLinks) { + private int crawlDomain(CrawlDataReference oldCrawlData, DomainProber.ProbeResult probeResult, DomainLinks domainLinks) throws IOException { String ip = findIp(domain); + EdgeUrl rootUrl; + + warcRecorder.writeWarcinfoHeader(ip, new EdgeDomain(domain), probeResult); + + if (!(probeResult instanceof DomainProber.ProbeResultOk ok)) { + return 1; + } + else { + rootUrl = ok.probedUrl(); + } + + assert !crawlFrontier.isEmpty(); final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain, warcRecorder); @@ -170,7 +157,7 @@ public class CrawlerRetreiver implements AutoCloseable { var top = crawlFrontier.takeNextUrl(); if (!robotsRules.isAllowed(top.toString())) { - crawledDomainWriter.accept(CrawledDocumentFactory.createRobotsError(top)); + warcRecorder.flagAsRobotsTxtError(top); continue; } @@ -193,15 +180,13 @@ public class CrawlerRetreiver implements AutoCloseable { continue; - if (fetchWriteAndSleep(top, delayTimer, DocumentWithReference.empty()).isPresent()) { + if (fetchWriteAndSleep(top, delayTimer, DocumentWithReference.empty()).isOk()) { fetchedCount++; } } ret.cookies = fetcher.getCookies(); - crawledDomainWriter.accept(ret); - return fetchedCount; } @@ -216,16 +201,16 @@ public class CrawlerRetreiver implements AutoCloseable { var url = rootUrl.withPathAndParam("/", null); - var maybeSample = fetchUrl(url, delayTimer, DocumentWithReference.empty()).filter(sample -> sample.httpStatus == 200); - if (maybeSample.isEmpty()) + var result = tryDownload(url, delayTimer, ContentTags.empty()); + if (!(result instanceof HttpFetchResult.ResultOk ok)) return; - var sample = maybeSample.get(); - if (sample.documentBody == null) + var optDoc = ok.parseDocument(); + if (optDoc.isEmpty()) return; // Sniff the software based on the sample document - var doc = Jsoup.parse(sample.documentBody); + var doc = optDoc.get(); crawlFrontier.setLinkFilter(linkFilterSelector.selectFilter(doc)); for (var link : doc.getElementsByTag("link")) { @@ -252,41 +237,54 @@ public class CrawlerRetreiver implements AutoCloseable { } } - public Optional fetchWriteAndSleep(EdgeUrl top, + public HttpFetchResult fetchWriteAndSleep(EdgeUrl top, CrawlDelayTimer timer, DocumentWithReference reference) { logger.debug("Fetching {}", top); long startTime = System.currentTimeMillis(); - var docOpt = fetchUrl(top, timer, reference); + var contentTags = reference.getContentTags(); + var fetchedDoc = tryDownload(top, timer, contentTags); - if (docOpt.isPresent()) { - var doc = docOpt.get(); - - if (!Objects.equals(doc.recrawlState, CrawlerRevisitor.documentWasRetainedTag) - && reference.isContentBodySame(doc)) - { - // The document didn't change since the last time - doc.recrawlState = CrawlerRevisitor.documentWasSameTag; + if (fetchedDoc instanceof HttpFetchResult.ResultSame) { + var doc = reference.doc(); + if (doc != null) { + warcRecorder.writeReferenceCopy(top, doc.contentType, doc.httpStatus, doc.documentBody); + fetchedDoc = new HttpFetchResult.ResultRetained(doc.url, doc.documentBody); } + } - crawledDomainWriter.accept(doc); + try { + if (fetchedDoc instanceof HttpFetchResult.ResultOk ok) { + var docOpt = ok.parseDocument(); + if (docOpt.isPresent()) { + var doc = docOpt.get(); - if (doc.url != null) { - // We may have redirected to a different path - EdgeUrl.parse(doc.url).ifPresent(crawlFrontier::addVisited); + crawlFrontier.enqueueLinksFromDocument(top, doc); + crawlFrontier.addVisited(new EdgeUrl(ok.uri())); + } } + else if (fetchedDoc instanceof HttpFetchResult.ResultRetained retained) { + var docOpt = retained.parseDocument(); + if (docOpt.isPresent()) { + var doc = docOpt.get(); - if ("ERROR".equals(doc.crawlerStatus) && doc.httpStatus != 404) { - errorCount++; + crawlFrontier.enqueueLinksFromDocument(top, doc); + EdgeUrl.parse(retained.url()).ifPresent(crawlFrontier::addVisited); + } } - + else if (fetchedDoc instanceof HttpFetchResult.ResultException ex) { + errorCount ++; + } + } + catch (Exception ex) { + logger.error("Error parsing document {}", top, ex); } timer.delay(System.currentTimeMillis() - startTime); - return docOpt; + return fetchedDoc; } private boolean isAllowedProtocol(String proto) { @@ -294,42 +292,11 @@ public class CrawlerRetreiver implements AutoCloseable { || proto.equalsIgnoreCase("https"); } - private Optional fetchUrl(EdgeUrl top, CrawlDelayTimer timer, DocumentWithReference reference) { - try { - var contentTags = reference.getContentTags(); - var fetchedDoc = tryDownload(top, timer, contentTags); - - CrawledDocument doc = reference.replaceOn304(fetchedDoc); - - if (doc.documentBody != null) { - doc.documentBodyHash = createHash(doc.documentBody); - - var parsedDoc = Jsoup.parse(doc.documentBody); - EdgeUrl url = new EdgeUrl(doc.url); - - crawlFrontier.enqueueLinksFromDocument(url, parsedDoc); - findCanonicalUrl(url, parsedDoc) - .ifPresent(canonicalLink -> doc.canonicalUrl = canonicalLink.toString()); - } - - return Optional.of(doc); - } - catch (Exception ex) { - logger.warn("Failed to process document {}", top); - } - - return Optional.empty(); - - } - - @SneakyThrows - private CrawledDocument tryDownload(EdgeUrl top, CrawlDelayTimer timer, ContentTags tags) { + private HttpFetchResult tryDownload(EdgeUrl top, CrawlDelayTimer timer, ContentTags tags) { for (int i = 0; i < 2; i++) { try { - var doc = fetcher.fetchContent(top, warcRecorder, tags); - doc.recrawlState = "NEW"; - return doc; + return fetcher.fetchContent(top, warcRecorder, tags); } catch (RateLimitException ex) { timer.slowDown(); @@ -339,15 +306,20 @@ public class CrawlerRetreiver implements AutoCloseable { Thread.sleep(delay); } } + catch (Exception ex) { + logger.warn("Failed to fetch {}", top, ex); + return new HttpFetchResult.ResultException(ex); + } } - return CrawledDocumentFactory.createRetryError(top); + return new HttpFetchResult.ResultNone(); } private String createHash(String documentBodyHash) { return hashMethod.hashUnencodedChars(documentBodyHash).toString(); } + // FIXME this does not belong in the crawler private Optional findCanonicalUrl(EdgeUrl baseUrl, Document parsed) { baseUrl = baseUrl.domain.toRootUrl(); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java index 01bafbe1..1a66c7a5 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java @@ -1,8 +1,8 @@ package nu.marginalia.crawl.retreival; -import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyExtractor; -import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyResult; -import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult; +import nu.marginalia.crawling.body.DocumentBodyExtractor; +import nu.marginalia.crawling.body.DocumentBodyResult; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; import nu.marginalia.model.EdgeUrl; import org.jsoup.Jsoup; @@ -87,7 +87,7 @@ public class CrawlerWarcResynchronizer { } private void revisit(WarcRevisit revisit) throws IOException { - if (!WarcRecorder.revisitURI.equals(revisit.profile())) { + if (!WarcRecorder.documentRevisitURN.equals(revisit.profile())) { return; } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/ContentTypeProber.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/ContentTypeProber.java index 55f2e633..df070cc5 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/ContentTypeProber.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/ContentTypeProber.java @@ -1,6 +1,6 @@ package nu.marginalia.crawl.retreival.fetcher; -import nu.marginalia.crawl.retreival.logic.ContentTypeLogic; +import nu.marginalia.crawling.body.ContentTypeLogic; import nu.marginalia.model.EdgeUrl; import okhttp3.OkHttpClient; import okhttp3.Request; diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java index be815954..70576510 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java @@ -3,12 +3,11 @@ package nu.marginalia.crawl.retreival.fetcher; import com.google.inject.ImplementedBy; import crawlercommons.robots.SimpleRobotRules; import nu.marginalia.crawl.retreival.RateLimitException; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; -import nu.marginalia.crawling.model.CrawledDocument; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; -import java.nio.file.Path; import java.util.List; @ImplementedBy(HttpFetcherImpl.class) @@ -20,7 +19,7 @@ public interface HttpFetcher { FetchResult probeDomain(EdgeUrl url); - CrawledDocument fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags) throws RateLimitException; + HttpFetchResult fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags) throws RateLimitException; SimpleRobotRules fetchRobotRules(EdgeDomain domain, WarcRecorder recorder); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java index 3faffe4a..d7732baa 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java @@ -8,30 +8,26 @@ import lombok.SneakyThrows; import nu.marginalia.crawl.retreival.Cookies; import nu.marginalia.crawl.retreival.RateLimitException; import nu.marginalia.crawl.retreival.fetcher.ContentTypeProber.ContentTypeProbeResult; -import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyExtractor; -import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyResult; -import nu.marginalia.crawl.retreival.fetcher.socket.*; -import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult; -import static nu.marginalia.crawl.retreival.CrawledDocumentFactory.*; +import nu.marginalia.crawl.retreival.fetcher.socket.FastTerminatingSocketFactory; +import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor; +import nu.marginalia.crawl.retreival.fetcher.socket.NoSecuritySSL; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; -import nu.marginalia.crawling.model.CrawledDocument; -import nu.marginalia.crawling.model.CrawlerDocumentStatus; +import nu.marginalia.crawling.body.ContentTypeLogic; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; -import nu.marginalia.crawl.retreival.logic.ContentTypeLogic; -import okhttp3.*; +import okhttp3.ConnectionPool; +import okhttp3.Dispatcher; +import okhttp3.OkHttpClient; +import okhttp3.Request; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLException; import javax.net.ssl.X509TrustManager; -import java.io.EOFException; -import java.io.IOException; -import java.net.*; -import java.nio.charset.IllegalCharsetNameException; -import java.time.LocalDateTime; -import java.util.*; +import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -141,9 +137,9 @@ public class HttpFetcherImpl implements HttpFetcher { @Override @SneakyThrows - public CrawledDocument fetchContent(EdgeUrl url, - WarcRecorder warcRecorder, - ContentTags contentTags) + public HttpFetchResult fetchContent(EdgeUrl url, + WarcRecorder warcRecorder, + ContentTags contentTags) throws RateLimitException { @@ -152,23 +148,21 @@ public class HttpFetcherImpl implements HttpFetcher { if (contentTags.isEmpty() && contentTypeLogic.isUrlLikeBinary(url)) { ContentTypeProbeResult probeResult = contentTypeProber.probeContentType(url); - switch (probeResult) { - case ContentTypeProbeResult.Ok(EdgeUrl redirectUrl) -> { - url = redirectUrl; - } - case ContentTypeProbeResult.BadContentType (String contentType, int statusCode) -> { - return createErrorResponse(url, contentType, statusCode, - CrawlerDocumentStatus.BAD_CONTENT_TYPE, - contentType - ); - } - case ContentTypeProbeResult.Timeout timeout -> { - return createTimeoutErrorRsp(url); - } - case ContentTypeProbeResult.Exception ex -> { - return createErrorFromException(url, ex.ex()); - } - }; + if (probeResult instanceof ContentTypeProbeResult.Ok ok) { + url = ok.resolvedUrl(); + } + else if (probeResult instanceof ContentTypeProbeResult.BadContentType badContentType) { + warcRecorder.flagAsFailedContentTypeProbe(url, badContentType.contentType(), badContentType.statusCode()); + return new HttpFetchResult.ResultNone(); + } + else if (probeResult instanceof ContentTypeProbeResult.BadContentType.Timeout timeout) { + warcRecorder.flagAsTimeout(url); + return new HttpFetchResult.ResultNone(); + } + else if (probeResult instanceof ContentTypeProbeResult.Exception exception) { + warcRecorder.flagAsError(url, exception.ex()); + return new HttpFetchResult.ResultNone(); + } } var getBuilder = new Request.Builder().get(); @@ -181,78 +175,20 @@ public class HttpFetcherImpl implements HttpFetcher { HttpFetchResult result = warcRecorder.fetch(client, getBuilder.build()); - if (result instanceof HttpFetchResult.ResultError err) { - return createErrorFromException(url, err.ex()); - } - else if (result instanceof HttpFetchResult.ResultOk ok) { - try { - return extractBody(userAgent, url, ok); + if (result instanceof HttpFetchResult.ResultOk ok) { + if (ok.statusCode() == 429) { + String retryAfter = Objects.requireNonNullElse(ok.header("Retry-After"), "1000"); + throw new RateLimitException(retryAfter); } - catch (Exception ex) { - return createErrorFromException(url, ex); + if (ok.statusCode() == 304) { + return new HttpFetchResult.ResultSame(); + } + if (ok.statusCode() == 200) { + return ok; } } - else { - throw new IllegalStateException(STR."Unknown result type \{result.getClass()}"); - } - } - private CrawledDocument createErrorFromException(EdgeUrl url, Exception exception) throws RateLimitException { - return switch (exception) { - case RateLimitException rle -> throw rle; - case SocketTimeoutException ex -> createTimeoutErrorRsp(url); - case UnknownHostException ex -> createUnknownHostError(url); - case SocketException ex -> createHardErrorRsp(url, ex); - case ProtocolException ex -> createHardErrorRsp(url, ex); - case IllegalCharsetNameException ex -> createHardErrorRsp(url, ex); - case SSLException ex -> createHardErrorRsp(url, ex); - case EOFException ex -> createHardErrorRsp(url, ex); - default -> { - logger.error("Error during fetching", exception); - yield createHardErrorRsp(url, exception); - } - }; - } - - public static CrawledDocument extractBody(String userAgent, EdgeUrl url, HttpFetchResult.ResultOk rsp) throws IOException, RateLimitException { - - var responseUrl = new EdgeUrl(rsp.uri()); - - if (!Objects.equals(responseUrl.domain, url.domain)) { - return createRedirectResponse(url, rsp, responseUrl); - } - - if (rsp.statusCode() == 429) { - String retryAfter = Objects.requireNonNullElse(rsp.header("Retry-After"), "1000"); - - throw new RateLimitException(retryAfter); - } - - if (!isXRobotsTagsPermitted(rsp.allHeaders("X-Robots-Tag"), userAgent)) { - return CrawledDocument.builder() - .crawlerStatus(CrawlerDocumentStatus.ROBOTS_TXT.name()) - .crawlerStatusDesc("X-Robots-Tag") - .url(responseUrl.toString()) - .httpStatus(-1) - .timestamp(LocalDateTime.now().toString()) - .headers(rsp.headers().toString()) - .build(); - } - - return switch(DocumentBodyExtractor.extractBody(rsp)) { - case DocumentBodyResult.Error(CrawlerDocumentStatus status, String why) -> - createErrorResponse(url, rsp, status, why); - case DocumentBodyResult.Ok(String contentType, String body) -> - CrawledDocument.builder() - .crawlerStatus(CrawlerDocumentStatus.OK.name()) - .headers(rsp.headers().toString()) - .contentType(contentType) - .timestamp(LocalDateTime.now().toString()) - .httpStatus(rsp.statusCode()) - .url(responseUrl.toString()) - .documentBody(body) - .build(); - }; + return new HttpFetchResult.ResultNone(); } /** Check X-Robots-Tag header tag to see if we are allowed to index this page. @@ -318,17 +254,31 @@ public class HttpFetcherImpl implements HttpFetcher { private Optional fetchRobotsForProto(String proto, WarcRecorder recorder, EdgeDomain domain) { try { var url = new EdgeUrl(proto, domain, null, "/robots.txt", null); - return Optional.of(parseRobotsTxt(fetchContent(url, recorder, ContentTags.empty()))); + + var getBuilder = new Request.Builder().get(); + + getBuilder.url(url.toString()) + .addHeader("Accept-Encoding", "gzip") + .addHeader("User-agent", userAgent); + + HttpFetchResult result = recorder.fetch(client, getBuilder.build()); + + if (result instanceof HttpFetchResult.ResultOk ok) { + return Optional.of(parseRobotsTxt(ok)); + } + else { + return Optional.empty(); + } } catch (Exception ex) { return Optional.empty(); } } - private SimpleRobotRules parseRobotsTxt(CrawledDocument doc) { - return robotsParser.parseContent(doc.url, - doc.documentBody.getBytes(), - doc.contentType, + private SimpleRobotRules parseRobotsTxt(HttpFetchResult.ResultOk ok) { + return robotsParser.parseContent(ok.uri().toString(), + ok.bytesRaw(), + ok.header("Content-Type"), userAgent); } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyExtractor.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyExtractor.java deleted file mode 100644 index 99ae2cae..00000000 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyExtractor.java +++ /dev/null @@ -1,44 +0,0 @@ -package nu.marginalia.crawl.retreival.fetcher.body; - -import nu.marginalia.contenttype.ContentTypeParser; -import nu.marginalia.contenttype.DocumentBodyToString; -import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult; -import nu.marginalia.crawl.retreival.logic.ContentTypeLogic; -import nu.marginalia.crawling.model.CrawlerDocumentStatus; -import org.apache.commons.io.input.BOMInputStream; - -import java.io.IOException; -import java.util.zip.GZIPInputStream; - -public class DocumentBodyExtractor { - private static ContentTypeLogic contentTypeLogic = new ContentTypeLogic(); - - public static DocumentBodyResult extractBody(HttpFetchResult.ResultOk rsp) throws IOException { - var byteStream = rsp.getInputStream(); - - if ("gzip".equals(rsp.header("Content-Encoding"))) { - byteStream = new GZIPInputStream(byteStream); - } - byteStream = new BOMInputStream(byteStream); - - var contentTypeHeader = rsp.header("Content-Type"); - if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) { - return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); - } - - byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder - - var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data); - if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) { - return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); - } - - if ("Shift_JIS".equalsIgnoreCase(contentType.charset())) { - return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CHARSET, ""); - } - - - return new DocumentBodyResult.Ok(contentType.contentType(), DocumentBodyToString.getStringData(contentType, data)); - } - -} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyResult.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyResult.java deleted file mode 100644 index fc5d67ec..00000000 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyResult.java +++ /dev/null @@ -1,8 +0,0 @@ -package nu.marginalia.crawl.retreival.fetcher.body; - -import nu.marginalia.crawling.model.CrawlerDocumentStatus; - -public sealed interface DocumentBodyResult { - record Ok(String contentType, String body) implements DocumentBodyResult { } - record Error(CrawlerDocumentStatus status, String why) implements DocumentBodyResult { } -} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java index 3d4b5aaa..b7bb82bd 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java @@ -1,6 +1,9 @@ package nu.marginalia.crawl.retreival.fetcher.warc; +import nu.marginalia.crawl.retreival.DomainProber; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor; +import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -8,7 +11,6 @@ import org.netpreserve.jwarc.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; @@ -18,9 +20,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.security.NoSuchAlgorithmException; import java.time.Instant; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** Based on JWarc's fetch method, APL 2.0 license *

@@ -29,7 +29,12 @@ import java.util.Map; * be reconstructed. */ public class WarcRecorder implements AutoCloseable { - public static final URI revisitURI = URI.create("urn:marginalia:revisit"); + public static final URI documentRevisitURN = URI.create("urn:marginalia/data/doc/revisit"); + + public static final URI documentRobotsTxtSkippedURN = URI.create("urn:marginalia/meta/doc/robots-txt-skipped"); + public static final URI documentBadContentTypeURN = URI.create("urn:marginalia/meta/doc/content-type-failed-probe"); + public static final URI documentProbeTimeout = URI.create("urn:marginalia/meta/doc/timeout-probe"); + public static final URI documentUnspecifiedError = URI.create("urn:marginalia/meta/doc/error"); private static final int MAX_TIME = 30_000; private static final int MAX_SIZE = 1024 * 1024 * 10; @@ -37,10 +42,14 @@ public class WarcRecorder implements AutoCloseable { private final Path warcFile; private static final Logger logger = LoggerFactory.getLogger(WarcRecorder.class); - private ThreadLocal bufferThreadLocal = ThreadLocal.withInitial(() -> new byte[MAX_SIZE]); + private final ThreadLocal bufferThreadLocal = ThreadLocal.withInitial(() -> new byte[MAX_SIZE]); private boolean temporaryFile = false; + // Affix a version string in case we need to change the format in the future + // in some way + private final String warcRecorderVersion = "1.0"; + /** * Create a new WarcRecorder that will write to the given file * @@ -48,7 +57,7 @@ public class WarcRecorder implements AutoCloseable { */ public WarcRecorder(Path warcFile) throws IOException { this.warcFile = warcFile; - this.writer = new WarcWriter(this.warcFile); + this.writer = new WarcWriter(warcFile); } /** @@ -170,7 +179,7 @@ public class WarcRecorder implements AutoCloseable { } catch (Exception ex) { logger.warn("Failed to fetch URL {}", uri, ex); - return new HttpFetchResult.ResultError(ex); + return new HttpFetchResult.ResultException(ex); } } @@ -178,55 +187,141 @@ public class WarcRecorder implements AutoCloseable { writer.write(item); } - /** - * Flag the given URL as skipped by the crawler, so that it will not be retried. - * Which URLs were skipped is still important when resynchronizing on the WARC file, - * so that the crawler can avoid re-fetching them. - * - * @param url The URL to flag - * @param headers - * @param documentBody - */ - public void flagAsSkipped(EdgeUrl url, String headers, int statusCode, String documentBody) { + private void saveOldResponse(EdgeUrl url, String contentType, int statusCode, String documentBody) { try { WarcDigestBuilder responseDigestBuilder = new WarcDigestBuilder(); WarcDigestBuilder payloadDigestBuilder = new WarcDigestBuilder(); - String header = WarcProtocolReconstructor.getResponseHeader(headers, statusCode); + byte[] bytes = documentBody.getBytes(); + + String fakeHeaders = STR.""" + Content-Type: \{contentType} + Content-Length: \{bytes.length} + Content-Encoding: UTF-8 + """; + + String header = WarcProtocolReconstructor.getResponseHeader(fakeHeaders, statusCode); ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer(); responseDataBuffer.put(header); responseDigestBuilder.update(header); - try (var inputStream = new ByteArrayInputStream(documentBody.getBytes())) { - int remainingLength; - while ((remainingLength = responseDataBuffer.remaining()) > 0) { - int startPos = responseDataBuffer.pos(); + responseDigestBuilder.update(bytes, bytes.length); + payloadDigestBuilder.update(bytes, bytes.length); + responseDataBuffer.put(bytes, 0, bytes.length); - int n = responseDataBuffer.readFrom(inputStream, remainingLength); - if (n < 0) - break; - - responseDataBuffer.updateDigest(responseDigestBuilder, startPos, n); - responseDataBuffer.updateDigest(payloadDigestBuilder, startPos, n); - } - } - - WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), revisitURI) + WarcXResponseReference reference = new WarcXResponseReference.Builder(url.asURI()) .blockDigest(responseDigestBuilder.build()) .payloadDigest(payloadDigestBuilder.build()) .date(Instant.now()) .body(MediaType.HTTP_RESPONSE, responseDataBuffer.copyBytes()) .build(); - revisit.http(); // force HTTP header to be parsed before body is consumed so that caller can use it + reference.http(); // force HTTP header to be parsed before body is consumed so that caller can use it - writer.write(revisit); + writer.write(reference); } catch (URISyntaxException | IOException | NoSuchAlgorithmException e) { throw new RuntimeException(e); } } + /** + * Flag the given URL as skipped by the crawler, so that it will not be retried. + * Which URLs were skipped is still important when resynchronizing on the WARC file, + * so that the crawler can avoid re-fetching them. + */ + public void flagAsSkipped(EdgeUrl url, String contentType, int statusCode, String documentBody) { + saveOldResponse(url, contentType, statusCode, documentBody); + } + + /** + * Write a reference copy of the given document data. This is used when the crawler provides + * an E-Tag or Last-Modified header, and the server responds with a 304 Not Modified. In this + * scenario we want to record the data as it was in the previous crawl, but not re-fetch it. + */ + public void writeReferenceCopy(EdgeUrl url, String contentType, int statusCode, String documentBody) { + saveOldResponse(url, contentType, statusCode, documentBody); + } + + public void writeWarcinfoHeader(String ip, EdgeDomain domain, DomainProber.ProbeResult result) throws IOException { + + Map> fields = new HashMap<>(); + fields.put("ip", List.of(ip)); + fields.put("software", List.of(STR."search.marginalia.nu/\{warcRecorderVersion}")); + fields.put("domain", List.of(domain.toString())); + + switch (result) { + case DomainProber.ProbeResultRedirect redirectDomain: + fields.put("X-WARC-Probe-Status", List.of(STR."REDIRECT;\{redirectDomain.domain()}")); + break; + case DomainProber.ProbeResultError error: + fields.put("X-WARC-Probe-Status", List.of(STR."\{error.status().toString()};\{error.desc()}")); + break; + case DomainProber.ProbeResultOk ok: + fields.put("X-WARC-Probe-Status", List.of("OK")); + break; + } + + var warcinfo = new Warcinfo.Builder() + .date(Instant.now()) + .fields(fields) + .recordId(UUID.randomUUID()) + .build(); + + writer.write(warcinfo); + } + + public void flagAsRobotsTxtError(EdgeUrl top) { + try { + WarcRevisit revisit = new WarcRevisit.Builder(top.asURI(), documentRobotsTxtSkippedURN) + .date(Instant.now()) + .build(); + + writer.write(revisit); + } catch (URISyntaxException | IOException e) { + throw new RuntimeException(e); + } + } + + public void flagAsFailedContentTypeProbe(EdgeUrl url, String contentType, int status) { + try { + WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), documentBadContentTypeURN) + .date(Instant.now()) + .addHeader("Rejected-Content-Type", contentType) + .addHeader("Http-Status", Integer.toString(status)) + .build(); + + writer.write(revisit); + } catch (URISyntaxException | IOException e) { + throw new RuntimeException(e); + } + } + + public void flagAsError(EdgeUrl url, Exception ex) { + try { + WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), documentUnspecifiedError) + .date(Instant.now()) + .addHeader("Exception", ex.getClass().getSimpleName()) + .addHeader("ErrorMessage", Objects.requireNonNullElse(ex.getMessage(), "")) + .build(); + + writer.write(revisit); + } catch (URISyntaxException | IOException e) { + throw new RuntimeException(e); + } + } + + public void flagAsTimeout(EdgeUrl url) { + try { + WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), documentProbeTimeout) + .date(Instant.now()) + .build(); + + writer.write(revisit); + } catch (URISyntaxException | IOException e) { + throw new RuntimeException(e); + } + } private class ResponseDataBuffer { private final byte[] data; diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java index c77af845..70a98310 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java @@ -5,15 +5,11 @@ import nu.marginalia.crawl.retreival.CrawlDataReference; import nu.marginalia.crawl.retreival.CrawlDelayTimer; import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.DomainCrawlFrontier; -import nu.marginalia.crawl.retreival.CrawledDocumentFactory; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; import nu.marginalia.crawling.model.CrawledDocument; -import nu.marginalia.crawling.model.SerializableCrawlData; import nu.marginalia.model.EdgeUrl; import org.jsoup.Jsoup; -import java.util.function.Consumer; - /** This class encapsulates the logic for re-visiting a domain that has already been crawled. * We may use information from the previous crawl to inform the next crawl, specifically the * E-Tag and Last-Modified headers. @@ -27,16 +23,13 @@ public class CrawlerRevisitor { private final DomainCrawlFrontier crawlFrontier; - private final Consumer crawledDomainWriter; private final CrawlerRetreiver crawlerRetreiver; private final WarcRecorder warcRecorder; public CrawlerRevisitor(DomainCrawlFrontier crawlFrontier, - Consumer crawledDomainWriter, CrawlerRetreiver crawlerRetreiver, WarcRecorder warcRecorder) { this.crawlFrontier = crawlFrontier; - this.crawledDomainWriter = crawledDomainWriter; this.crawlerRetreiver = crawlerRetreiver; this.warcRecorder = warcRecorder; } @@ -69,7 +62,7 @@ public class CrawlerRevisitor { if (doc.httpStatus != 200) continue; if (!robotsRules.isAllowed(url.toString())) { - crawledDomainWriter.accept(CrawledDocumentFactory.createRobotsError(url)); + warcRecorder.flagAsRobotsTxtError(url); continue; } if (!crawlFrontier.filterLink(url)) @@ -87,7 +80,6 @@ public class CrawlerRevisitor { // fashion to make sure we eventually catch changes over time // and ensure we discover new links - crawledDomainWriter.accept(doc); crawlFrontier.addVisited(url); // Hoover up any links from the document @@ -97,7 +89,7 @@ public class CrawlerRevisitor { } // Add a WARC record so we don't repeat this - warcRecorder.flagAsSkipped(url, doc.headers, doc.httpStatus, doc.documentBody); + warcRecorder.flagAsSkipped(url, doc.contentType, doc.httpStatus, doc.documentBody); continue; } @@ -107,15 +99,14 @@ public class CrawlerRevisitor { // providing etag and last-modified headers, so we can recycle the // document if it hasn't changed without actually downloading it - var fetchedDocOpt = crawlerRetreiver.fetchWriteAndSleep(url, - delayTimer, - new DocumentWithReference(doc, oldCrawlData)); - if (fetchedDocOpt.isEmpty()) continue; + var reference = new DocumentWithReference(doc, oldCrawlData); + var result = crawlerRetreiver.fetchWriteAndSleep(url, delayTimer, reference); - if (documentWasRetainedTag.equals(fetchedDocOpt.get().recrawlState)) retained ++; - else if (documentWasSameTag.equals(fetchedDocOpt.get().recrawlState)) retained ++; + if (reference.isSame(result)) { + retained++; + } - recrawled ++; + recrawled++; } return recrawled; diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java index e832541f..03b96760 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java @@ -1,12 +1,15 @@ package nu.marginalia.crawl.retreival.revisit; +import lombok.SneakyThrows; import nu.marginalia.crawl.retreival.CrawlDataReference; -import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.fetcher.ContentTags; +import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; +import nu.marginalia.crawling.body.DocumentBodyExtractor; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawling.model.CrawledDocument; +import nu.marginalia.model.EdgeUrl; import javax.annotation.Nullable; -import java.time.LocalDateTime; public record DocumentWithReference( @Nullable CrawledDocument doc, @@ -18,17 +21,28 @@ public record DocumentWithReference( return emptyInstance; } - public boolean isContentBodySame(CrawledDocument newDoc) { + /** Returns true if the provided document is the same as the reference document, + * or if the result was retained via HTTP 304. + */ + public boolean isSame(HttpFetchResult result) { + if (result instanceof HttpFetchResult.ResultSame) + return true; + if (result instanceof HttpFetchResult.ResultRetained) + return true; + + if (!(result instanceof HttpFetchResult.ResultOk resultOk)) + return false; + if (reference == null) return false; if (doc == null) return false; if (doc.documentBody == null) return false; - if (newDoc.documentBody == null) - return false; - return reference.isContentBodySame(doc, newDoc); + return DocumentBodyExtractor.extractBody(resultOk) + .map((contentType, body) -> reference.isContentBodySame(doc.documentBody, body)) + .orElse(false); } public ContentTags getContentTags() { @@ -60,23 +74,4 @@ public record DocumentWithReference( return doc == null || reference == null; } - /** - * If the provided document has HTTP status 304, and the reference document is provided, - * return the reference document; otherwise return the provided document. - */ - public CrawledDocument replaceOn304(CrawledDocument fetchedDoc) { - - if (doc == null) - return fetchedDoc; - - // HTTP status 304 is NOT MODIFIED, which means the document is the same as it was when - // we fetched it last time. We can recycle the reference document. - if (fetchedDoc.httpStatus != 304) - return fetchedDoc; - - var ret = doc; - ret.recrawlState = CrawlerRevisitor.documentWasRetainedTag; - ret.timestamp = LocalDateTime.now().toString(); - return ret; - } } diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java index 55f2eebe..4faa2042 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java @@ -8,9 +8,7 @@ import okhttp3.Request; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.netpreserve.jwarc.WarcReader; -import org.netpreserve.jwarc.WarcRequest; -import org.netpreserve.jwarc.WarcResponse; +import org.netpreserve.jwarc.*; import java.io.IOException; import java.net.URISyntaxException; @@ -22,6 +20,7 @@ import java.util.Map; import java.util.zip.GZIPInputStream; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; class WarcRecorderTest { Path fileName; @@ -33,7 +32,7 @@ class WarcRecorderTest { .addNetworkInterceptor(new IpInterceptingNetworkInterceptor()) .build(); - fileName = Files.createTempFile("test", ".warc.gz"); + fileName = Files.createTempFile("test", ".warc"); client = new WarcRecorder(fileName); } @@ -73,10 +72,7 @@ class WarcRecorderTest { try (var recorder = new WarcRecorder(fileName)) { recorder.flagAsSkipped(new EdgeUrl("https://www.marginalia.nu/"), - """ - Content-type: text/html - X-Cookies: 1 - """, + "text/html", 200, "test"); } @@ -95,5 +91,27 @@ class WarcRecorderTest { new GZIPInputStream(Files.newInputStream(fileName)).transferTo(System.out); } + @Test + public void testSaveImport() throws URISyntaxException, IOException { + try (var recorder = new WarcRecorder(fileName)) { + recorder.flagAsSkipped(new EdgeUrl("https://www.marginalia.nu/"), + "text/html", + 200, + "test"); + } + + try (var reader = new WarcReader(fileName)) { + WarcXResponseReference.register(reader); + + for (var record : reader) { + System.out.println(record.type()); + System.out.println(record.getClass().getSimpleName()); + if (record instanceof WarcXResponseReference rsp) { + assertEquals("https://www.marginalia.nu/", rsp.target()); + } + } + } + + } } \ No newline at end of file diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java index 2f3076cd..4590dde2 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java @@ -4,8 +4,10 @@ import lombok.SneakyThrows; import nu.marginalia.crawl.retreival.RateLimitException; import nu.marginalia.crawl.retreival.fetcher.ContentTags; import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl; +import nu.marginalia.crawling.body.DocumentBodyExtractor; +import nu.marginalia.crawling.body.DocumentBodyResult; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; -import nu.marginalia.crawl.retreival.logic.ContentTypeLogic; +import nu.marginalia.crawling.body.ContentTypeLogic; import nu.marginalia.model.EdgeUrl; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -33,10 +35,11 @@ class HttpFetcherTest { void fetchUTF8() throws URISyntaxException, RateLimitException, IOException { var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler"); try (var recorder = new WarcRecorder()) { - var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), recorder, ContentTags.empty()); - System.out.println(str.contentType); + var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), recorder, ContentTags.empty()); + if (DocumentBodyExtractor.extractBody(result) instanceof DocumentBodyResult.Ok bodyOk) { + System.out.println(bodyOk.contentType()); + } } - } @Test @@ -44,8 +47,10 @@ class HttpFetcherTest { var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler"); try (var recorder = new WarcRecorder()) { - var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), recorder, ContentTags.empty()); - System.out.println(str.contentType); + var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), recorder, ContentTags.empty()); + if (DocumentBodyExtractor.extractBody(result) instanceof DocumentBodyResult.Ok bodyOk) { + System.out.println(bodyOk.contentType()); + } } } } \ No newline at end of file diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java index 2a00e6de..b7727022 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java @@ -5,6 +5,7 @@ import lombok.SneakyThrows; import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.DomainProber; import nu.marginalia.crawl.retreival.fetcher.*; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; import nu.marginalia.crawling.model.CrawledDocument; import nu.marginalia.crawling.model.CrawlerDocumentStatus; @@ -13,6 +14,7 @@ import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.crawlspec.CrawlSpecRecord; import nu.marginalia.test.CommonTestData; +import okhttp3.Headers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -21,12 +23,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URISyntaxException; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Consumer; +import java.util.*; public class CrawlerMockFetcherTest { @@ -65,9 +62,9 @@ public class CrawlerMockFetcherTest { } - void crawl(CrawlSpecRecord spec, Consumer consumer) throws IOException { + void crawl(CrawlSpecRecord spec) throws IOException { try (var recorder = new WarcRecorder()) { - new CrawlerRetreiver(fetcherMock, new DomainProber(d -> true), spec, recorder, consumer) + new CrawlerRetreiver(fetcherMock, new DomainProber(d -> true), spec, recorder) .fetch(); } } @@ -80,9 +77,7 @@ public class CrawlerMockFetcherTest { registerUrlClasspathData(new EdgeUrl("https://startrek.website/c/startrek"), "mock-crawl-data/lemmy/c_startrek.html"); registerUrlClasspathData(new EdgeUrl("https://startrek.website/post/108995"), "mock-crawl-data/lemmy/108995.html"); - crawl(new CrawlSpecRecord("startrek.website", 10, new ArrayList<>()), out::add); - - out.forEach(System.out::println); + crawl(new CrawlSpecRecord("startrek.website", 10, new ArrayList<>())); } @Test @@ -91,9 +86,7 @@ public class CrawlerMockFetcherTest { registerUrlClasspathData(new EdgeUrl("https://en.wikipedia.org/"), "mock-crawl-data/mediawiki/index.html"); - crawl(new CrawlSpecRecord("en.wikipedia.org", 10, new ArrayList<>()), out::add); - - out.forEach(System.out::println); + crawl(new CrawlSpecRecord("en.wikipedia.org", 10, new ArrayList<>())); } @Test @@ -104,9 +97,7 @@ public class CrawlerMockFetcherTest { registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/telegram-channel-to-idle-on/3501"), "mock-crawl-data/discourse/telegram.html"); registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/combined-mode-but-grid/4489"), "mock-crawl-data/discourse/grid.html"); - crawl(new CrawlSpecRecord("community.tt-rss.org", 10, new ArrayList<>()), out::add); - - out.forEach(System.out::println); + crawl(new CrawlSpecRecord("community.tt-rss.org", 10, new ArrayList<>())); } class MockFetcher implements HttpFetcher { @@ -126,21 +117,23 @@ public class CrawlerMockFetcherTest { return new FetchResult(FetchResultState.OK, url); } + @SneakyThrows @Override - public CrawledDocument fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags) { + public HttpFetchResult fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags) { logger.info("Fetching {}", url); if (mockData.containsKey(url)) { - return mockData.get(url); - } - else { - return CrawledDocument.builder() - .crawlId("1") - .url(url.toString()) - .contentType("text/html") - .httpStatus(404) - .crawlerStatus(CrawlerDocumentStatus.ERROR.name()) - .build(); + byte[] bodyBytes = mockData.get(url).documentBody.getBytes(); + return new HttpFetchResult.ResultOk( + url.asURI(), + 200, + new Headers.Builder().build(), + bodyBytes, + 0, + bodyBytes.length + ); } + + return new HttpFetchResult.ResultNone(); } @Override diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java index 59bf99f6..286f15f5 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java @@ -16,15 +16,14 @@ import nu.marginalia.crawling.model.CrawledDomain; import nu.marginalia.crawling.model.SerializableCrawlData; import nu.marginalia.model.crawlspec.CrawlSpecRecord; import org.junit.jupiter.api.*; -import org.netpreserve.jwarc.WarcReader; -import org.netpreserve.jwarc.WarcRequest; -import org.netpreserve.jwarc.WarcResponse; +import org.netpreserve.jwarc.*; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.*; import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -33,6 +32,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class CrawlerRetreiverTest { private HttpFetcher httpFetcher; + Path tempFile; + Path tempFile2; @BeforeEach public void setUp() { httpFetcher = new HttpFetcherImpl("search.marginalia.nu; testing a bit :D"); @@ -45,6 +46,15 @@ class CrawlerRetreiverTest { System.setProperty("http.agent", WmsaHome.getUserAgent().uaString()); } + @AfterEach + public void tearDown() throws IOException { + if (tempFile != null) { + Files.deleteIfExists(tempFile); + } + if (tempFile2 != null) { + Files.deleteIfExists(tempFile2); + } + } @Test public void testWarcOutput() throws IOException { var specs = CrawlSpecRecord @@ -57,10 +67,8 @@ class CrawlerRetreiverTest { try { tempFile = Files.createTempFile("crawling-process", "warc"); - List data = new ArrayList<>(); - try (var recorder = new WarcRecorder(tempFile)) { - new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, data::add).fetch(); + new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(); } catch (IOException ex) { Assertions.fail(ex); } @@ -93,7 +101,7 @@ class CrawlerRetreiverTest { } } @Test - public void testWithKnownDomains() { + public void testWithKnownDomains() throws IOException { var specs = CrawlSpecRecord .builder() .crawlDepth(5) @@ -103,15 +111,30 @@ class CrawlerRetreiverTest { List data = new ArrayList<>(); - try (var recorder = new WarcRecorder()) { - new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, data::add).fetch(); + tempFile = Files.createTempFile("crawling-process", ".warc"); + + try (var recorder = new WarcRecorder(tempFile)) { + new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(); } catch (IOException ex) { Assertions.fail(ex); } + + try (var stream = CrawledDomainReader.createDataStream(tempFile)) { + while (stream.hasNext()) { + if (stream.next() instanceof CrawledDocument doc) { + data.add(doc); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + var fetchedUrls = - data.stream().filter(CrawledDocument.class::isInstance) + data.stream() + .peek(System.out::println) + .filter(CrawledDocument.class::isInstance) .map(CrawledDocument.class::cast) .map(doc -> doc.url) .collect(Collectors.toSet()); @@ -126,7 +149,7 @@ class CrawlerRetreiverTest { } @Test - public void testEmptySet() { + public void testEmptySet() throws IOException { var specs = CrawlSpecRecord .builder() @@ -135,15 +158,30 @@ class CrawlerRetreiverTest { .urls(List.of()) .build(); + List data = new ArrayList<>(); - try (var recorder = new WarcRecorder()) { - new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, data::add).fetch(); + tempFile = Files.createTempFile("crawling-process", ".warc"); + + try (var recorder = new WarcRecorder(tempFile)) { + new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(); } catch (IOException ex) { Assertions.fail(ex); } + + try (var stream = CrawledDomainReader.createDataStream(tempFile)) { + while (stream.hasNext()) { + if (stream.next() instanceof CrawledDocument doc) { + data.add(doc); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + data.stream().filter(CrawledDocument.class::isInstance) .map(CrawledDocument.class::cast) .forEach(doc -> System.out.println(doc.url + "\t" + doc.crawlerStatus + "\t" + doc.httpStatus)); @@ -174,43 +212,70 @@ class CrawlerRetreiverTest { .build(); - Path out = Files.createTempDirectory("crawling-process"); - var writer = new CrawledDomainWriter(out, specs.domain, "idid"); + tempFile = Files.createTempFile("crawling-process", ".warc.gz"); + tempFile2 = Files.createTempFile("crawling-process", ".warc.gz"); + Map, List> data = new HashMap<>(); - try (var recorder = new WarcRecorder()) { - new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, d -> { - data.computeIfAbsent(d.getClass(), k->new ArrayList<>()).add(d); - if (d instanceof CrawledDocument doc) { - System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus); - if (Math.random() > 0.5) { - doc.headers = ""; - } - } - writer.accept(d); - }).fetch(); + try (var recorder = new WarcRecorder(tempFile)) { + new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(); } catch (IOException ex) { Assertions.fail(ex); } - - writer.close(); - - var reader = new CrawledDomainReader(); - var stream = reader.createDataStream(out, specs.domain, "idid"); + try (var stream = CrawledDomainReader.createDataStream(tempFile)) { + while (stream.hasNext()) { + var doc = stream.next(); + data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + var stream = CrawledDomainReader.createDataStream(tempFile); CrawledDomain domain = (CrawledDomain) data.get(CrawledDomain.class).get(0); domain.doc = data.get(CrawledDocument.class).stream().map(CrawledDocument.class::cast).collect(Collectors.toList()); - try (var recorder = new WarcRecorder()) { - new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, d -> { - if (d instanceof CrawledDocument doc) { - System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus); - } - }).fetch(new DomainLinks(), new CrawlDataReference(stream)); + try (var recorder = new WarcRecorder(tempFile2)) { + new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(new DomainLinks(), + new CrawlDataReference(stream)); } catch (IOException ex) { Assertions.fail(ex); } + + new GZIPInputStream(Files.newInputStream(tempFile2)).transferTo(System.out); + + try (var reader = new WarcReader(tempFile2)) { + WarcXResponseReference.register(reader); + + reader.forEach(record -> { + if (record instanceof WarcResponse rsp) { + try { + System.out.println(rsp.type() + ":" + rsp.target() + "/" + rsp.http().status()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if (record instanceof WarcMetadata rsp) { + System.out.println("meta:" + rsp.target()); + } + }); + } + + try (var ds = CrawledDomainReader.createDataStream(tempFile2)) { + while (ds.hasNext()) { + var doc = ds.next(); + if (doc instanceof CrawledDomain dr) { + System.out.println(dr.domain + "/" + dr.crawlerStatus); + } + else if (doc instanceof CrawledDocument dc) { + System.out.println(dc.url + "/" + dc.crawlerStatus + "/" + dc.httpStatus); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + + } } } \ No newline at end of file diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ExportAtagsActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ExportAtagsActor.java index 0af77acb..353ef965 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ExportAtagsActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ExportAtagsActor.java @@ -63,8 +63,6 @@ public class ExportAtagsActor extends RecordActorPrototype { Path inputDir = storageService.getStorage(crawlId).asPath(); - var reader = new CrawledDomainReader(); - try (var bw = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(Files.newOutputStream(tmpFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)))); ) { @@ -78,7 +76,7 @@ public class ExportAtagsActor extends RecordActorPrototype { } Path crawlDataPath = inputDir.resolve(item.relPath()); - try (var stream = reader.createDataStream(crawlDataPath)) { + try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { exportLinks(tagWriter, stream); } catch (Exception ex) { diff --git a/code/tools/crawl-data-unfcker/src/main/java/nu/marginalia/tools/CrawlDataUnfcker.java b/code/tools/crawl-data-unfcker/src/main/java/nu/marginalia/tools/CrawlDataUnfcker.java index 1a73a952..4322d3fc 100644 --- a/code/tools/crawl-data-unfcker/src/main/java/nu/marginalia/tools/CrawlDataUnfcker.java +++ b/code/tools/crawl-data-unfcker/src/main/java/nu/marginalia/tools/CrawlDataUnfcker.java @@ -29,13 +29,11 @@ public class CrawlDataUnfcker { return; } - var reader = new CrawledDomainReader(); - try (var wl = new WorkLog(output.resolve("crawler.log"))) { for (var inputItem : WorkLog.iterable(input.resolve("crawler.log"))) { Path inputPath = input.resolve(inputItem.relPath()); - var domainMaybe = readDomain(reader, inputPath).map(CrawledDomain::getDomain); + var domainMaybe = readDomain(inputPath).map(CrawledDomain::getDomain); if (domainMaybe.isEmpty()) continue; var domain = domainMaybe.get(); @@ -43,7 +41,7 @@ public class CrawlDataUnfcker { // Generate conformant ID String newId = Integer.toHexString(domain.hashCode()); - var outputPath = CrawlerOutputFile.createOutputPath(output, newId, domain); + var outputPath = CrawlerOutputFile.createLegacyOutputPath(output, newId, domain); var outputFileName = outputPath.toFile().getName(); System.out.println(inputPath + " -> " + outputPath); @@ -56,13 +54,13 @@ public class CrawlDataUnfcker { } } - static Optional readDomain(CrawledDomainReader reader, Path file) { + static Optional readDomain(Path file) { if (!Files.exists(file)) { System.out.println("Missing file " + file); return Optional.empty(); } - try (var stream = reader.createDataStream(file)) { + try (var stream = CrawledDomainReader.createDataStream(file)) { while (stream.hasNext()) { if (stream.next() instanceof CrawledDomain domain) { return Optional.of(domain); diff --git a/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/ExperimentRunnerMain.java b/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/ExperimentRunnerMain.java index 97df4a39..c5751a7a 100644 --- a/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/ExperimentRunnerMain.java +++ b/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/ExperimentRunnerMain.java @@ -50,10 +50,9 @@ public class ExperimentRunnerMain { experiment.args(Arrays.copyOfRange(args, 2, args.length)); Path basePath = Path.of(args[0]); - var reader = new CrawledDomainReader(); for (var item : WorkLog.iterable(basePath.resolve("crawler.log"))) { Path crawlDataPath = basePath.resolve(item.relPath()); - try (var stream = reader.createDataStream(crawlDataPath)) { + try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { experiment.process(stream); } catch (Exception ex) { diff --git a/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/LegacyExperiment.java b/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/LegacyExperiment.java index 4e61ffc4..5d7d8d11 100644 --- a/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/LegacyExperiment.java +++ b/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/LegacyExperiment.java @@ -5,12 +5,12 @@ import nu.marginalia.crawling.model.CrawledDocument; import nu.marginalia.crawling.model.CrawledDomain; import java.io.IOException; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; public abstract class LegacyExperiment extends Experiment { public abstract boolean process(CrawledDomain domain); + @Override public boolean process(SerializableCrawlDataStream dataStream) throws IOException { List documentList = new ArrayList<>(); diff --git a/settings.gradle b/settings.gradle index af8a45f5..42ae0f47 100644 --- a/settings.gradle +++ b/settings.gradle @@ -155,7 +155,7 @@ dependencyResolutionManagement { library('duckdb', 'org.duckdb', 'duckdb_jdbc').version('0.9.1') library('okhttp3','com.squareup.okhttp3','okhttp').version('4.11.0') - library('jwarc', 'org.netpreserve', 'jwarc').version('0.28.4') + library('jwarc', 'org.netpreserve', 'jwarc').version('0.28.5') library('httpcomponents.core','org.apache.httpcomponents','httpcore').version('4.4.15') library('httpcomponents.client','org.apache.httpcomponents','httpclient').version('4.5.13')