diff --git a/code/process-models/crawling-model/build.gradle b/code/process-models/crawling-model/build.gradle index ebbea855..f1f77a70 100644 --- a/code/process-models/crawling-model/build.gradle +++ b/code/process-models/crawling-model/build.gradle @@ -20,13 +20,18 @@ dependencies { implementation project(':code:api:index-api') implementation project(':code:common:service-discovery') implementation project(':code:common:service-client') + implementation project(':code:features-crawl:content-type') implementation project(':code:libraries:language-processing') implementation libs.bundles.slf4j implementation libs.notnull + implementation libs.jwarc implementation libs.gson + implementation libs.commons.io + implementation libs.okhttp3 + implementation libs.jsoup implementation libs.snakeyaml implementation libs.zstd diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/logic/ContentTypeLogic.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/ContentTypeLogic.java similarity index 97% rename from code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/logic/ContentTypeLogic.java rename to code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/ContentTypeLogic.java index c5860913..d7dfa6d1 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/logic/ContentTypeLogic.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/ContentTypeLogic.java @@ -1,4 +1,4 @@ -package nu.marginalia.crawl.retreival.logic; +package nu.marginalia.crawling.body; import nu.marginalia.model.EdgeUrl; diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java new file mode 100644 index 00000000..7bb548e5 --- /dev/null +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java @@ -0,0 +1,60 @@ +package nu.marginalia.crawling.body; + +import nu.marginalia.contenttype.ContentTypeParser; +import nu.marginalia.contenttype.DocumentBodyToString; +import nu.marginalia.crawling.model.CrawlerDocumentStatus; +import org.apache.commons.io.input.BOMInputStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.zip.GZIPInputStream; + +public class DocumentBodyExtractor { + private static ContentTypeLogic contentTypeLogic = new ContentTypeLogic(); + + private static final Logger logger = LoggerFactory.getLogger(DocumentBodyExtractor.class); + + public static DocumentBodyResult extractBody(HttpFetchResult result) { + if (result instanceof HttpFetchResult.ResultOk fetchOk) { + return extractBody(fetchOk); + } + else { + return new DocumentBodyResult.Error(CrawlerDocumentStatus.ERROR, ""); + } + } + + public static DocumentBodyResult extractBody(HttpFetchResult.ResultOk rsp) { + try { + var byteStream = rsp.getInputStream(); + + if ("gzip".equals(rsp.header("Content-Encoding"))) { + byteStream = new GZIPInputStream(byteStream); + } + byteStream = new BOMInputStream(byteStream); + + var contentTypeHeader = rsp.header("Content-Type"); + if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) { + return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); + } + + byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder + + var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data); + if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) { + return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); + } + + if ("Shift_JIS".equalsIgnoreCase(contentType.charset())) { + return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CHARSET, ""); + } + + return new DocumentBodyResult.Ok(contentType.contentType(), DocumentBodyToString.getStringData(contentType, data)); + } + catch (IOException ex) { + logger.error("Failed to extract body", ex); + return new DocumentBodyResult.Error(CrawlerDocumentStatus.ERROR, ""); + } + } + +} diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyResult.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyResult.java new file mode 100644 index 00000000..1959f844 --- /dev/null +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyResult.java @@ -0,0 +1,23 @@ +package nu.marginalia.crawling.body; + +import nu.marginalia.crawling.model.CrawlerDocumentStatus; + +import java.util.Optional; +import java.util.function.BiFunction; + +public sealed interface DocumentBodyResult { + record Ok(String contentType, String body) implements DocumentBodyResult { + @Override + public Optional map(BiFunction fun) { + return Optional.of(fun.apply(contentType, body)); + } + } + record Error(CrawlerDocumentStatus status, String why) implements DocumentBodyResult { + @Override + public Optional map(BiFunction fun) { + return Optional.empty(); + } + } + + Optional map(BiFunction fun); +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/HttpFetchResult.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/HttpFetchResult.java similarity index 63% rename from code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/HttpFetchResult.java rename to code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/HttpFetchResult.java index ae9673b1..9790e3da 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/HttpFetchResult.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/HttpFetchResult.java @@ -1,17 +1,23 @@ -package nu.marginalia.crawl.retreival.fetcher.warc; +package nu.marginalia.crawling.body; import okhttp3.Headers; +import org.jsoup.Jsoup; import org.netpreserve.jwarc.MessageHeaders; import org.netpreserve.jwarc.WarcResponse; import org.netpreserve.jwarc.WarcRevisit; +import org.jsoup.nodes.Document; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.util.List; +import java.util.Optional; public sealed interface HttpFetchResult { + + boolean isOk(); + static ResultOk importWarc(WarcResponse response) throws IOException { var http = response.http(); try (var body = http.body()) { @@ -27,6 +33,7 @@ public sealed interface HttpFetchResult { ); } } + static ResultOk importWarc(WarcRevisit revisit) throws IOException { var http = revisit.http(); try (var body = http.body()) { @@ -41,7 +48,11 @@ public sealed interface HttpFetchResult { bytes.length ); } + finally { + revisit.body().consume(); + } } + record ResultOk(URI uri, int statusCode, Headers headers, @@ -50,6 +61,10 @@ public sealed interface HttpFetchResult { int bytesLength ) implements HttpFetchResult { + public boolean isOk() { + return statusCode >= 200 && statusCode < 300; + } + public ResultOk(URI uri, int statusCode, MessageHeaders headers, @@ -73,6 +88,14 @@ public sealed interface HttpFetchResult { return new ByteArrayInputStream(bytesRaw, bytesStart, bytesLength); } + public Optional parseDocument() throws IOException { + return switch(DocumentBodyExtractor.extractBody(this)) { + case DocumentBodyResult.Ok ok when "text/html".equalsIgnoreCase(ok.contentType()) + -> Optional.of(Jsoup.parse(ok.body())); + default -> Optional.empty(); + }; + } + public String header(String name) { return headers.get(name); } @@ -82,5 +105,34 @@ public sealed interface HttpFetchResult { }; - record ResultError(Exception ex) implements HttpFetchResult { }; + record ResultRetained(String url, String body) implements HttpFetchResult { + + public boolean isOk() { + return true; + } + + public Optional parseDocument() { + try { + return Optional.of(Jsoup.parse(body)); + } + catch (Exception ex) { + return Optional.empty(); + } + } + }; + record ResultException(Exception ex) implements HttpFetchResult { + public boolean isOk() { + return false; + } + }; + record ResultSame() implements HttpFetchResult { + public boolean isOk() { + return false; + } + }; + record ResultNone() implements HttpFetchResult { + public boolean isOk() { + return false; + } + }; } diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java index b7021ace..0da0b790 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainReader.java @@ -1,156 +1,44 @@ package nu.marginalia.crawling.io; -import com.github.luben.zstd.RecyclingBufferPool; -import com.github.luben.zstd.ZstdInputStream; import com.google.gson.Gson; -import nu.marginalia.crawling.model.CrawledDocument; -import nu.marginalia.crawling.model.CrawledDomain; -import nu.marginalia.crawling.model.SerializableCrawlData; +import nu.marginalia.crawling.io.format.LegacyFileReadingSerializableCrawlDataStream; +import nu.marginalia.crawling.io.format.WarcReadingSerializableCrawlDataStream; import nu.marginalia.model.gson.GsonFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.*; +import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; -import java.util.Optional; -import java.util.concurrent.ForkJoinPool; -import java.util.concurrent.TimeUnit; public class CrawledDomainReader { - private final Gson gson = GsonFactory.get(); - private final Logger logger = LoggerFactory.getLogger(getClass()); - private final ForkJoinPool pool = new ForkJoinPool(6); + private static final Gson gson = GsonFactory.get(); public CrawledDomainReader() { } /** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */ - public SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException { - return new FileReadingSerializableCrawlDataStream(gson, fullPath.toFile()); + public static SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException { + String fileName = fullPath.getFileName().toString(); + if (fileName.endsWith(".zstd")) { + return new LegacyFileReadingSerializableCrawlDataStream(gson, fullPath.toFile()); + } + else if (fileName.endsWith(".warc") || fileName.endsWith(".warc.gz")) { + return new WarcReadingSerializableCrawlDataStream(fullPath); + } + else { + throw new IllegalArgumentException("Unknown file type: " + fullPath); + } } /** An iterator-like access to domain data. This must be closed otherwise it will leak off-heap memory! */ - public SerializableCrawlDataStream createDataStream(Path basePath, String domain, String id) throws IOException { - return createDataStream(CrawlerOutputFile.getOutputFile(basePath, id, domain)); - } + public static SerializableCrawlDataStream createDataStream(Path basePath, String domain, String id) throws IOException { + Path warcPath = CrawlerOutputFile.getWarcPath(basePath, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL); - /** Read the entirety of the domain data into memory. This uses a lot of RAM */ - public CrawledDomain read(Path path) throws IOException { - DomainDataAssembler domainData = new DomainDataAssembler(); - - try (var br = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(path.toFile()), RecyclingBufferPool.INSTANCE)))) { - String line; - while ((line = br.readLine()) != null) { - if (line.startsWith("//")) { - String identifier = line; - String data = br.readLine(); - - pool.execute(() -> deserializeLine(identifier, data, domainData)); - } - } + if (Files.exists(warcPath)) { + return createDataStream(warcPath); } - - while (!pool.awaitQuiescence(1, TimeUnit.SECONDS)); - - return domainData.assemble(); - } - - - private void deserializeLine(String identifier, String data, DomainDataAssembler assembler) { - if (null == data) { - return; - } - if (identifier.equals(CrawledDomain.SERIAL_IDENTIFIER)) { - assembler.acceptDomain(gson.fromJson(data, CrawledDomain.class)); - } else if (identifier.equals(CrawledDocument.SERIAL_IDENTIFIER)) { - assembler.acceptDoc(gson.fromJson(data, CrawledDocument.class)); + else { + return createDataStream(CrawlerOutputFile.getLegacyOutputFile(basePath, id, domain)); } } - public Optional readOptionally(Path path) { - try { - return Optional.of(read(path)); - } - catch (Exception ex) { - return Optional.empty(); - } - } - - private static class DomainDataAssembler { - private CrawledDomain domainPrototype; - private final List docs = new ArrayList<>(); - - public synchronized void acceptDomain(CrawledDomain domain) { - this.domainPrototype = domain; - } - - public synchronized void acceptDoc(CrawledDocument doc) { - docs.add(doc); - } - - public synchronized CrawledDomain assemble() { - if (!docs.isEmpty()) { - if (domainPrototype.doc == null) - domainPrototype.doc = new ArrayList<>(); - - domainPrototype.doc.addAll(docs); - } - return domainPrototype; - } - } - - private static class FileReadingSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream { - private final Gson gson; - private final BufferedReader bufferedReader; - private SerializableCrawlData next = null; - - public FileReadingSerializableCrawlDataStream(Gson gson, File file) throws IOException { - this.gson = gson; - bufferedReader = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file), RecyclingBufferPool.INSTANCE))); - } - - @Override - public SerializableCrawlData next() throws IOException { - if (hasNext()) { - var ret = next; - next = null; - return ret; - } - throw new IllegalStateException("No more data"); - } - - @Override - public boolean hasNext() throws IOException { - if (next != null) - return true; - - String identifier = bufferedReader.readLine(); - if (identifier == null) { - bufferedReader.close(); - return false; - } - String data = bufferedReader.readLine(); - if (data == null) { - bufferedReader.close(); - return false; - } - - if (identifier.equals(CrawledDomain.SERIAL_IDENTIFIER)) { - next = gson.fromJson(data, CrawledDomain.class); - } else if (identifier.equals(CrawledDocument.SERIAL_IDENTIFIER)) { - next = gson.fromJson(data, CrawledDocument.class); - } - else { - throw new IllegalStateException("Unknown identifier: " + identifier); - } - return true; - } - - @Override - public void close() throws Exception { - bufferedReader.close(); - } - } } diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java index 0e278f09..f21715ee 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawledDomainWriter.java @@ -55,7 +55,7 @@ public class CrawledDomainWriter implements AutoCloseable { } private Path getOutputFile(String id, String name) throws IOException { - return CrawlerOutputFile.createOutputPath(outputDir, id, name); + return CrawlerOutputFile.createLegacyOutputPath(outputDir, id, name); } @Override diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java index 67e8738c..907eb081 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java @@ -9,7 +9,11 @@ import java.nio.file.Path; public class CrawlerOutputFile { /** Return the Path to a file for the given id and name */ - public static Path getOutputFile(Path base, String id, String name) { + public static Path getLegacyOutputFile(Path base, String id, String name) { + if (id.length() < 4) { + id = Strings.repeat("0", 4 - id.length()) + id; + } + String first = id.substring(0, 2); String second = id.substring(2, 4); @@ -19,7 +23,7 @@ public class CrawlerOutputFile { /** Return the Path to a file for the given id and name, creating the prerequisite * directory structure as necessary. */ - public static Path createOutputPath(Path base, String id, String name) throws IOException { + public static Path createLegacyOutputPath(Path base, String id, String name) throws IOException { if (id.length() < 4) { id = Strings.repeat("0", 4 - id.length()) + id; } @@ -49,20 +53,37 @@ public class CrawlerOutputFile { } - public static Path createWarcFile(Path baseDir, String id, String name, WarcFileVersion version) { + public static Path createWarcPath(Path basePath, String id, String domain, WarcFileVersion version) throws IOException { if (id.length() < 4) { id = Strings.repeat("0", 4 - id.length()) + id; } - String fileName = STR."\{id}-\{filesystemSafeName(name)}.zstd\{version.suffix}"; + String first = id.substring(0, 2); + String second = id.substring(2, 4); - return baseDir.resolve(fileName); + Path destDir = basePath.resolve(first).resolve(second); + if (!Files.exists(destDir)) { + Files.createDirectories(destDir); + } + return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}-\{version.suffix}.warc.gz"); + } + + public static Path getWarcPath(Path basePath, String id, String domain, WarcFileVersion version) { + if (id.length() < 4) { + id = Strings.repeat("0", 4 - id.length()) + id; + } + + String first = id.substring(0, 2); + String second = id.substring(2, 4); + + Path destDir = basePath.resolve(first).resolve(second); + return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}.warc\{version.suffix}"); } public enum WarcFileVersion { - LIVE(".open"), - TEMP(".tmp"), - FINAL(""); + LIVE("open"), + TEMP("tmp"), + FINAL("final"); public final String suffix; diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/SerializableCrawlDataStream.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/SerializableCrawlDataStream.java index 3aecc0fc..9598d002 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/SerializableCrawlDataStream.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/SerializableCrawlDataStream.java @@ -1,11 +1,13 @@ package nu.marginalia.crawling.io; import nu.marginalia.crawling.model.SerializableCrawlData; +import org.jetbrains.annotations.Nullable; import java.io.IOException; +import java.nio.file.Path; import java.util.Iterator; -/** Closable iterator over serialized crawl data +/** Closable iterator exceptional over serialized crawl data * The data may appear in any order, and the iterator must be closed. * * @see CrawledDomainReader @@ -17,6 +19,8 @@ public interface SerializableCrawlDataStream extends AutoCloseable { boolean hasNext() throws IOException; + @Nullable + default Path path() { return null; } // Dummy iterator over nothing static SerializableCrawlDataStream empty() { diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/LegacyFileReadingSerializableCrawlDataStream.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/LegacyFileReadingSerializableCrawlDataStream.java new file mode 100644 index 00000000..efff17f3 --- /dev/null +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/LegacyFileReadingSerializableCrawlDataStream.java @@ -0,0 +1,70 @@ +package nu.marginalia.crawling.io.format; + +import com.github.luben.zstd.RecyclingBufferPool; +import com.github.luben.zstd.ZstdInputStream; +import com.google.gson.Gson; +import nu.marginalia.crawling.io.SerializableCrawlDataStream; +import nu.marginalia.crawling.model.CrawledDocument; +import nu.marginalia.crawling.model.CrawledDomain; +import nu.marginalia.crawling.model.SerializableCrawlData; + +import java.io.*; +import java.nio.file.Path; + +public class LegacyFileReadingSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream { + private final Gson gson; + private final BufferedReader bufferedReader; + private SerializableCrawlData next = null; + + private final Path path; + public LegacyFileReadingSerializableCrawlDataStream(Gson gson, File file) throws IOException { + this.gson = gson; + bufferedReader = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file), RecyclingBufferPool.INSTANCE))); + path = file.toPath(); + } + + @Override + public Path path() { + return path; + } + @Override + public SerializableCrawlData next() throws IOException { + if (hasNext()) { + var ret = next; + next = null; + return ret; + } + throw new IllegalStateException("No more data"); + } + + @Override + public boolean hasNext() throws IOException { + if (next != null) + return true; + + String identifier = bufferedReader.readLine(); + if (identifier == null) { + bufferedReader.close(); + return false; + } + String data = bufferedReader.readLine(); + if (data == null) { + bufferedReader.close(); + return false; + } + + if (identifier.equals(CrawledDomain.SERIAL_IDENTIFIER)) { + next = gson.fromJson(data, CrawledDomain.class); + } else if (identifier.equals(CrawledDocument.SERIAL_IDENTIFIER)) { + next = gson.fromJson(data, CrawledDocument.class); + } else { + throw new IllegalStateException("Unknown identifier: " + identifier); + } + return true; + } + + @Override + public void close() throws Exception { + bufferedReader.close(); + } +} diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/WarcReadingSerializableCrawlDataStream.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/WarcReadingSerializableCrawlDataStream.java new file mode 100644 index 00000000..9d8d1a63 --- /dev/null +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/WarcReadingSerializableCrawlDataStream.java @@ -0,0 +1,156 @@ +package nu.marginalia.crawling.io.format; + +import lombok.SneakyThrows; +import nu.marginalia.crawling.body.DocumentBodyExtractor; +import nu.marginalia.crawling.body.DocumentBodyResult; +import nu.marginalia.crawling.body.HttpFetchResult; +import nu.marginalia.crawling.io.SerializableCrawlDataStream; +import nu.marginalia.crawling.model.CrawledDocument; +import nu.marginalia.crawling.model.CrawledDomain; +import nu.marginalia.crawling.model.SerializableCrawlData; +import org.netpreserve.jwarc.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.StringJoiner; + +public class WarcReadingSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream { + private static final Logger logger = LoggerFactory.getLogger(WarcReadingSerializableCrawlDataStream.class); + + private final WarcReader reader; + private final Iterator backingIterator; + private SerializableCrawlData next = null; + private final Path path; + + public WarcReadingSerializableCrawlDataStream(Path file) throws IOException { + path = file; + reader = new WarcReader(file); + WarcXResponseReference.register(reader); + + backingIterator = reader.iterator(); + } + + @Override + public Path path() { + return path; + } + + @Override + @SneakyThrows + public boolean hasNext() { + while (backingIterator.hasNext() && next == null) { + var nextRecord = backingIterator.next(); + if (nextRecord instanceof WarcResponse response) { // this also includes WarcXResponseReference + convertResponse(response); + } + else if (nextRecord instanceof Warcinfo warcinfo) { + convertWarcinfo(warcinfo); + } + else if (nextRecord instanceof WarcMetadata metadata) { + convertMetadata(metadata); + } + } + return next != null; + } + + private void convertMetadata(WarcMetadata metadata) { + // Nothing to do here for now + } + + private void convertWarcinfo(Warcinfo warcinfo) throws IOException { + var headers = warcinfo.fields(); + String probeStatus = headers.first("X-WARC-Probe-Status").orElse(""); + String[] parts = probeStatus.split(" ", 2); + + + String domain = headers.first("domain").orElseThrow(() -> new IllegalStateException("Missing domain header")); + String status = parts[0]; + String statusReason = parts.length > 1 ? parts[1] : ""; + String ip = headers.first("ip").orElse(""); + + String redirectDomain = null; + if ("REDIRECT".equalsIgnoreCase(status)) { + redirectDomain = statusReason; + } + + // TODO: Fix cookies info somehow + next = new CrawledDomain(domain, redirectDomain, status, statusReason, ip, List.of(), List.of()); + } + + private void convertResponse(WarcResponse response) throws IOException { + var http = response.http(); + + if (http.status() != 200) { + return; + } + CrawledDocument document; + + var parsedBody = DocumentBodyExtractor.extractBody(HttpFetchResult.importWarc(response)); + if (parsedBody instanceof DocumentBodyResult.Error error) { + next = new CrawledDocument( + "", + response.targetURI().toString(), + http.contentType().raw(), + response.date().toString(), + http.status(), + error.status().toString(), + error.why(), + headers(http.headers()), + null, + response.payloadDigest().map(WarcDigest::base64).orElse(""), + "", + "", + ""); + } else if (parsedBody instanceof DocumentBodyResult.Ok ok) { + next = new CrawledDocument( + "", + response.targetURI().toString(), + ok.contentType(), + response.date().toString(), + http.status(), + "OK", + "", + headers(http.headers()), + ok.body(), + response.payloadDigest().map(WarcDigest::base64).orElse(""), + "", + "", + ""); + } else { + // unreachable + throw new IllegalStateException("Unknown body type: " + parsedBody); + } + } + + public String headers(MessageHeaders headers) { + StringJoiner ret = new StringJoiner("\n"); + for (var header : headers.map().entrySet()) { + for (var value : header.getValue()) { + ret.add(STR."\{header.getKey()}: \{value}"); + } + } + return ret.toString(); + } + + public void close() throws IOException { + reader.close(); + } + + @Override + public SerializableCrawlData next() throws IOException { + if (!hasNext()) + throw new NoSuchElementException(); + try { + return next; + } + finally { + next = null; + } + } + +} diff --git a/code/process-models/crawling-model/src/main/java/org/netpreserve/jwarc/WarcXResponseReference.java b/code/process-models/crawling-model/src/main/java/org/netpreserve/jwarc/WarcXResponseReference.java new file mode 100644 index 00000000..7e02d936 --- /dev/null +++ b/code/process-models/crawling-model/src/main/java/org/netpreserve/jwarc/WarcXResponseReference.java @@ -0,0 +1,44 @@ +package org.netpreserve.jwarc; + +import java.io.IOException; +import java.net.URI; + +/** This defines a non-standard extension to WARC for storing old HTTP responses, + * essentially a 'revisit' with a full body, which is not something that is + * expected by the jwarc parser, and goes against the semantics of the revisit + * records a fair bit. + *

+ * An x-response-reference record is a response record with a full body, where + * the data is a reconstructed HTTP response from a previous crawl. + */ +public class WarcXResponseReference extends WarcResponse { + private static final String TYPE_NAME = "x-response-reference"; + + WarcXResponseReference(MessageVersion version, MessageHeaders headers, MessageBody body) { + super(version, headers, body); + } + + public static void register(WarcReader reader) { + reader.registerType(TYPE_NAME, WarcXResponseReference::new); + } + + public static class Builder extends AbstractBuilder { + public Builder(URI targetURI) { + this(targetURI.toString()); + } + + public Builder(String targetURI) { + super(TYPE_NAME); + setHeader("WARC-Target-URI", targetURI); + } + + public Builder body(HttpResponse httpResponse) throws IOException { + return body(MediaType.HTTP_RESPONSE, httpResponse); + } + + @Override + public WarcXResponseReference build() { + return build(WarcXResponseReference::new); + } + } +} diff --git a/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java b/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java index 718dea06..cbb88772 100644 --- a/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java +++ b/code/process-models/crawling-model/src/main/java/plan/CrawlPlan.java @@ -74,23 +74,13 @@ public class CrawlPlan { return count; } + @Deprecated public Iterable domainsIterable() { - final CrawledDomainReader reader = new CrawledDomainReader(); - - return WorkLog.iterableMap(crawl.getLogFile(), - entry -> { - var path = getCrawledFilePath(entry.path()); - if (!Files.exists(path)) { - logger.warn("File not found: {}", path); - return Optional.empty(); - } - return reader.readOptionally(path); - }); + // This is no longer supported + throw new UnsupportedOperationException(); } public Iterable crawlDataIterable(Predicate idPredicate) { - final CrawledDomainReader reader = new CrawledDomainReader(); - return WorkLog.iterableMap(crawl.getLogFile(), entry -> { if (!idPredicate.test(entry.id())) { @@ -105,7 +95,7 @@ public class CrawlPlan { } try { - return Optional.of(reader.createDataStream(path)); + return Optional.of(CrawledDomainReader.createDataStream(path)); } catch (IOException ex) { return Optional.empty(); diff --git a/code/processes/converting-process/src/test/java/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java b/code/processes/converting-process/src/test/java/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java index 5b5deddc..67b4f7b6 100644 --- a/code/processes/converting-process/src/test/java/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java +++ b/code/processes/converting-process/src/test/java/nu/marginalia/converting/CrawlingThenConvertingIntegrationTest.java @@ -79,7 +79,7 @@ public class CrawlingThenConvertingIntegrationTest { List data = new ArrayList<>(); try (var recorder = new WarcRecorder()) { - new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, data::add).fetch(); + new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(); } CrawledDomain domain = data.stream().filter(CrawledDomain.class::isInstance).map(CrawledDomain.class::cast).findFirst().get(); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java index a5d78a1f..f5b5a10e 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java @@ -62,7 +62,6 @@ public class CrawlerMain { private final SimpleBlockingThreadPool pool; private final Map processingIds = new ConcurrentHashMap<>(); - private final CrawledDomainReader reader = new CrawledDomainReader(); final AbortMonitor abortMonitor = AbortMonitor.getInstance(); @@ -142,6 +141,7 @@ public class CrawlerMain { public void run(CrawlSpecProvider specProvider, Path outputDir) throws InterruptedException, IOException { heartbeat.start(); + try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log")); AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(specProvider.getDomains()) ) { @@ -213,9 +213,9 @@ public class CrawlerMain { @Override public void run() throws Exception { - Path newWarcFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE); - Path tempFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP); - Path finalWarcFile = CrawlerOutputFile.createWarcFile(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL); + Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE); + Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP); + Path finalWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL); if (Files.exists(newWarcFile)) { Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING); @@ -224,9 +224,8 @@ public class CrawlerMain { Files.deleteIfExists(tempFile); } - try (CrawledDomainWriter writer = new CrawledDomainWriter(outputDir, domain, id); - var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now - var retreiver = new CrawlerRetreiver(fetcher, domainProber, specification, warcRecorder, writer::accept); + try (var warcRecorder = new WarcRecorder(newWarcFile); // write to a temp file for now + var retriever = new CrawlerRetreiver(fetcher, domainProber, specification, warcRecorder); CrawlDataReference reference = getReference()) { Thread.currentThread().setName("crawling:" + domain); @@ -234,39 +233,37 @@ public class CrawlerMain { var domainLinks = anchorTagsSource.getAnchorTags(domain); if (Files.exists(tempFile)) { - retreiver.syncAbortedRun(tempFile); + retriever.syncAbortedRun(tempFile); Files.delete(tempFile); } - int size = retreiver.fetch(domainLinks, reference); + int size = retriever.fetch(domainLinks, reference); + + // Delete the reference crawl data if it's not the same as the new one + // (mostly a case when migrating from legacy->warc) + reference.delete(); Files.move(newWarcFile, finalWarcFile, StandardCopyOption.REPLACE_EXISTING); - workLog.setJobToFinished(domain, writer.getOutputFile().toString(), size); + workLog.setJobToFinished(domain, finalWarcFile.toString(), size); heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks); logger.info("Fetched {}", domain); } catch (Exception e) { logger.error("Error fetching domain " + domain, e); Files.deleteIfExists(newWarcFile); - if (tempFile != null) { - Files.deleteIfExists(tempFile); - } + Files.deleteIfExists(tempFile); } finally { // We don't need to double-count these; it's also kept int he workLog processingIds.remove(domain); Thread.currentThread().setName("[idle]"); - - // FIXME: Remove this when we're done - Files.deleteIfExists(finalWarcFile); } } private CrawlDataReference getReference() { try { - var dataStream = reader.createDataStream(outputDir, domain, id); - return new CrawlDataReference(dataStream); + return new CrawlDataReference(CrawledDomainReader.createDataStream(outputDir, domain, id)); } catch (IOException e) { logger.debug("Failed to read previous crawl data for {}", specification.domain); return new CrawlDataReference(); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java index 985bfc39..9088ebb4 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlDataReference.java @@ -8,6 +8,8 @@ import nu.marginalia.lsh.EasyLSH; import javax.annotation.Nullable; import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; /** A reference to a domain that has been crawled before. */ public class CrawlDataReference implements AutoCloseable { @@ -22,6 +24,15 @@ public class CrawlDataReference implements AutoCloseable { this(SerializableCrawlDataStream.empty()); } + /** Delete the associated data from disk, if it exists */ + public void delete() throws IOException { + Path filePath = data.path(); + + if (filePath != null) { + Files.deleteIfExists(filePath); + } + } + @Nullable public CrawledDocument nextDocument() { try { @@ -37,12 +48,10 @@ public class CrawlDataReference implements AutoCloseable { return null; } - public boolean isContentBodySame(CrawledDocument one, CrawledDocument other) { - assert one.documentBody != null; - assert other.documentBody != null; + public boolean isContentBodySame(String one, String other) { - final long contentHashOne = contentHash(one.documentBody); - final long contentHashOther = contentHash(other.documentBody); + final long contentHashOne = contentHash(one); + final long contentHashOther = contentHash(other); return EasyLSH.hammingDistance(contentHashOne, contentHashOther) < 4; } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawledDocumentFactory.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawledDocumentFactory.java index b3ab9ee5..37f84d58 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawledDocumentFactory.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawledDocumentFactory.java @@ -1,6 +1,6 @@ package nu.marginalia.crawl.retreival; -import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawling.model.CrawledDocument; import nu.marginalia.crawling.model.CrawlerDocumentStatus; import nu.marginalia.model.EdgeUrl; diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java index 30054008..514243ee 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java @@ -7,7 +7,7 @@ import lombok.SneakyThrows; import nu.marginalia.atags.model.DomainLinks; import nu.marginalia.crawl.retreival.fetcher.ContentTags; import nu.marginalia.crawl.retreival.fetcher.HttpFetcher; -import nu.marginalia.crawl.retreival.fetcher.SitemapRetriever; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; import nu.marginalia.crawl.retreival.revisit.CrawlerRevisitor; import nu.marginalia.crawl.retreival.revisit.DocumentWithReference; @@ -18,16 +18,15 @@ import nu.marginalia.ip_blocklist.UrlBlocklist; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.crawlspec.CrawlSpecRecord; -import org.jsoup.Jsoup; import org.jsoup.nodes.Document; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.net.InetAddress; import java.net.UnknownHostException; import java.nio.file.Path; import java.util.*; -import java.util.function.Consumer; public class CrawlerRetreiver implements AutoCloseable { @@ -36,7 +35,6 @@ public class CrawlerRetreiver implements AutoCloseable { private final HttpFetcher fetcher; private final String domain; - private final Consumer crawledDomainWriter; private static final LinkParser linkParser = new LinkParser(); private static final Logger logger = LoggerFactory.getLogger(CrawlerRetreiver.class); @@ -56,8 +54,7 @@ public class CrawlerRetreiver implements AutoCloseable { public CrawlerRetreiver(HttpFetcher fetcher, DomainProber domainProber, CrawlSpecRecord specs, - WarcRecorder warcRecorder, - Consumer writer) + WarcRecorder warcRecorder) { this.warcRecorder = warcRecorder; this.fetcher = fetcher; @@ -65,11 +62,8 @@ public class CrawlerRetreiver implements AutoCloseable { domain = specs.domain; - crawledDomainWriter = writer; - - crawlFrontier = new DomainCrawlFrontier(new EdgeDomain(domain), Objects.requireNonNullElse(specs.urls, List.of()), specs.crawlDepth); - crawlerRevisitor = new CrawlerRevisitor(crawlFrontier, crawledDomainWriter, this, warcRecorder); + crawlerRevisitor = new CrawlerRevisitor(crawlFrontier, this, warcRecorder); sitemapFetcher = new SitemapFetcher(crawlFrontier, fetcher.createSitemapRetriever()); // We must always crawl the index page first, this is assumed when fingerprinting the server @@ -94,32 +88,13 @@ public class CrawlerRetreiver implements AutoCloseable { public int fetch(DomainLinks domainLinks, CrawlDataReference oldCrawlData) { final DomainProber.ProbeResult probeResult = domainProber.probeDomain(fetcher, domain, crawlFrontier.peek()); - return switch (probeResult) { - case DomainProber.ProbeResultOk(EdgeUrl probedUrl) -> crawlDomain(oldCrawlData, probedUrl, domainLinks); - case DomainProber.ProbeResultError(CrawlerDomainStatus status, String desc) -> { - crawledDomainWriter.accept( - CrawledDomain.builder() - .crawlerStatus(status.name()) - .crawlerStatusDesc(desc) - .domain(domain) - .ip(findIp(domain)) - .build() - ); - yield 1; - } - case DomainProber.ProbeResultRedirect(EdgeDomain redirectDomain) -> { - crawledDomainWriter.accept( - CrawledDomain.builder() - .crawlerStatus(CrawlerDomainStatus.REDIRECT.name()) - .crawlerStatusDesc("Redirected to different domain") - .redirectDomain(redirectDomain.toString()) - .domain(domain) - .ip(findIp(domain)) - .build() - ); - yield 1; - } - }; + try { + return crawlDomain(oldCrawlData, probeResult, domainLinks); + } + catch (Exception ex) { + logger.error("Error crawling domain {}", domain, ex); + return 0; + } } public void syncAbortedRun(Path warcFile) { @@ -128,9 +103,21 @@ public class CrawlerRetreiver implements AutoCloseable { resync.run(warcFile); } - private int crawlDomain(CrawlDataReference oldCrawlData, EdgeUrl rootUrl, DomainLinks domainLinks) { + private int crawlDomain(CrawlDataReference oldCrawlData, DomainProber.ProbeResult probeResult, DomainLinks domainLinks) throws IOException { String ip = findIp(domain); + EdgeUrl rootUrl; + + warcRecorder.writeWarcinfoHeader(ip, new EdgeDomain(domain), probeResult); + + if (!(probeResult instanceof DomainProber.ProbeResultOk ok)) { + return 1; + } + else { + rootUrl = ok.probedUrl(); + } + + assert !crawlFrontier.isEmpty(); final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain, warcRecorder); @@ -170,7 +157,7 @@ public class CrawlerRetreiver implements AutoCloseable { var top = crawlFrontier.takeNextUrl(); if (!robotsRules.isAllowed(top.toString())) { - crawledDomainWriter.accept(CrawledDocumentFactory.createRobotsError(top)); + warcRecorder.flagAsRobotsTxtError(top); continue; } @@ -193,15 +180,13 @@ public class CrawlerRetreiver implements AutoCloseable { continue; - if (fetchWriteAndSleep(top, delayTimer, DocumentWithReference.empty()).isPresent()) { + if (fetchWriteAndSleep(top, delayTimer, DocumentWithReference.empty()).isOk()) { fetchedCount++; } } ret.cookies = fetcher.getCookies(); - crawledDomainWriter.accept(ret); - return fetchedCount; } @@ -216,16 +201,16 @@ public class CrawlerRetreiver implements AutoCloseable { var url = rootUrl.withPathAndParam("/", null); - var maybeSample = fetchUrl(url, delayTimer, DocumentWithReference.empty()).filter(sample -> sample.httpStatus == 200); - if (maybeSample.isEmpty()) + var result = tryDownload(url, delayTimer, ContentTags.empty()); + if (!(result instanceof HttpFetchResult.ResultOk ok)) return; - var sample = maybeSample.get(); - if (sample.documentBody == null) + var optDoc = ok.parseDocument(); + if (optDoc.isEmpty()) return; // Sniff the software based on the sample document - var doc = Jsoup.parse(sample.documentBody); + var doc = optDoc.get(); crawlFrontier.setLinkFilter(linkFilterSelector.selectFilter(doc)); for (var link : doc.getElementsByTag("link")) { @@ -252,41 +237,54 @@ public class CrawlerRetreiver implements AutoCloseable { } } - public Optional fetchWriteAndSleep(EdgeUrl top, + public HttpFetchResult fetchWriteAndSleep(EdgeUrl top, CrawlDelayTimer timer, DocumentWithReference reference) { logger.debug("Fetching {}", top); long startTime = System.currentTimeMillis(); - var docOpt = fetchUrl(top, timer, reference); + var contentTags = reference.getContentTags(); + var fetchedDoc = tryDownload(top, timer, contentTags); - if (docOpt.isPresent()) { - var doc = docOpt.get(); - - if (!Objects.equals(doc.recrawlState, CrawlerRevisitor.documentWasRetainedTag) - && reference.isContentBodySame(doc)) - { - // The document didn't change since the last time - doc.recrawlState = CrawlerRevisitor.documentWasSameTag; + if (fetchedDoc instanceof HttpFetchResult.ResultSame) { + var doc = reference.doc(); + if (doc != null) { + warcRecorder.writeReferenceCopy(top, doc.contentType, doc.httpStatus, doc.documentBody); + fetchedDoc = new HttpFetchResult.ResultRetained(doc.url, doc.documentBody); } + } - crawledDomainWriter.accept(doc); + try { + if (fetchedDoc instanceof HttpFetchResult.ResultOk ok) { + var docOpt = ok.parseDocument(); + if (docOpt.isPresent()) { + var doc = docOpt.get(); - if (doc.url != null) { - // We may have redirected to a different path - EdgeUrl.parse(doc.url).ifPresent(crawlFrontier::addVisited); + crawlFrontier.enqueueLinksFromDocument(top, doc); + crawlFrontier.addVisited(new EdgeUrl(ok.uri())); + } } + else if (fetchedDoc instanceof HttpFetchResult.ResultRetained retained) { + var docOpt = retained.parseDocument(); + if (docOpt.isPresent()) { + var doc = docOpt.get(); - if ("ERROR".equals(doc.crawlerStatus) && doc.httpStatus != 404) { - errorCount++; + crawlFrontier.enqueueLinksFromDocument(top, doc); + EdgeUrl.parse(retained.url()).ifPresent(crawlFrontier::addVisited); + } } - + else if (fetchedDoc instanceof HttpFetchResult.ResultException ex) { + errorCount ++; + } + } + catch (Exception ex) { + logger.error("Error parsing document {}", top, ex); } timer.delay(System.currentTimeMillis() - startTime); - return docOpt; + return fetchedDoc; } private boolean isAllowedProtocol(String proto) { @@ -294,42 +292,11 @@ public class CrawlerRetreiver implements AutoCloseable { || proto.equalsIgnoreCase("https"); } - private Optional fetchUrl(EdgeUrl top, CrawlDelayTimer timer, DocumentWithReference reference) { - try { - var contentTags = reference.getContentTags(); - var fetchedDoc = tryDownload(top, timer, contentTags); - - CrawledDocument doc = reference.replaceOn304(fetchedDoc); - - if (doc.documentBody != null) { - doc.documentBodyHash = createHash(doc.documentBody); - - var parsedDoc = Jsoup.parse(doc.documentBody); - EdgeUrl url = new EdgeUrl(doc.url); - - crawlFrontier.enqueueLinksFromDocument(url, parsedDoc); - findCanonicalUrl(url, parsedDoc) - .ifPresent(canonicalLink -> doc.canonicalUrl = canonicalLink.toString()); - } - - return Optional.of(doc); - } - catch (Exception ex) { - logger.warn("Failed to process document {}", top); - } - - return Optional.empty(); - - } - - @SneakyThrows - private CrawledDocument tryDownload(EdgeUrl top, CrawlDelayTimer timer, ContentTags tags) { + private HttpFetchResult tryDownload(EdgeUrl top, CrawlDelayTimer timer, ContentTags tags) { for (int i = 0; i < 2; i++) { try { - var doc = fetcher.fetchContent(top, warcRecorder, tags); - doc.recrawlState = "NEW"; - return doc; + return fetcher.fetchContent(top, warcRecorder, tags); } catch (RateLimitException ex) { timer.slowDown(); @@ -339,15 +306,20 @@ public class CrawlerRetreiver implements AutoCloseable { Thread.sleep(delay); } } + catch (Exception ex) { + logger.warn("Failed to fetch {}", top, ex); + return new HttpFetchResult.ResultException(ex); + } } - return CrawledDocumentFactory.createRetryError(top); + return new HttpFetchResult.ResultNone(); } private String createHash(String documentBodyHash) { return hashMethod.hashUnencodedChars(documentBodyHash).toString(); } + // FIXME this does not belong in the crawler private Optional findCanonicalUrl(EdgeUrl baseUrl, Document parsed) { baseUrl = baseUrl.domain.toRootUrl(); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java index 01bafbe1..1a66c7a5 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java @@ -1,8 +1,8 @@ package nu.marginalia.crawl.retreival; -import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyExtractor; -import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyResult; -import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult; +import nu.marginalia.crawling.body.DocumentBodyExtractor; +import nu.marginalia.crawling.body.DocumentBodyResult; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; import nu.marginalia.model.EdgeUrl; import org.jsoup.Jsoup; @@ -87,7 +87,7 @@ public class CrawlerWarcResynchronizer { } private void revisit(WarcRevisit revisit) throws IOException { - if (!WarcRecorder.revisitURI.equals(revisit.profile())) { + if (!WarcRecorder.documentRevisitURN.equals(revisit.profile())) { return; } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/ContentTypeProber.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/ContentTypeProber.java index 55f2e633..df070cc5 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/ContentTypeProber.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/ContentTypeProber.java @@ -1,6 +1,6 @@ package nu.marginalia.crawl.retreival.fetcher; -import nu.marginalia.crawl.retreival.logic.ContentTypeLogic; +import nu.marginalia.crawling.body.ContentTypeLogic; import nu.marginalia.model.EdgeUrl; import okhttp3.OkHttpClient; import okhttp3.Request; diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java index be815954..70576510 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcher.java @@ -3,12 +3,11 @@ package nu.marginalia.crawl.retreival.fetcher; import com.google.inject.ImplementedBy; import crawlercommons.robots.SimpleRobotRules; import nu.marginalia.crawl.retreival.RateLimitException; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; -import nu.marginalia.crawling.model.CrawledDocument; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; -import java.nio.file.Path; import java.util.List; @ImplementedBy(HttpFetcherImpl.class) @@ -20,7 +19,7 @@ public interface HttpFetcher { FetchResult probeDomain(EdgeUrl url); - CrawledDocument fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags) throws RateLimitException; + HttpFetchResult fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags) throws RateLimitException; SimpleRobotRules fetchRobotRules(EdgeDomain domain, WarcRecorder recorder); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java index 3faffe4a..d7732baa 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java @@ -8,30 +8,26 @@ import lombok.SneakyThrows; import nu.marginalia.crawl.retreival.Cookies; import nu.marginalia.crawl.retreival.RateLimitException; import nu.marginalia.crawl.retreival.fetcher.ContentTypeProber.ContentTypeProbeResult; -import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyExtractor; -import nu.marginalia.crawl.retreival.fetcher.body.DocumentBodyResult; -import nu.marginalia.crawl.retreival.fetcher.socket.*; -import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult; -import static nu.marginalia.crawl.retreival.CrawledDocumentFactory.*; +import nu.marginalia.crawl.retreival.fetcher.socket.FastTerminatingSocketFactory; +import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor; +import nu.marginalia.crawl.retreival.fetcher.socket.NoSecuritySSL; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; -import nu.marginalia.crawling.model.CrawledDocument; -import nu.marginalia.crawling.model.CrawlerDocumentStatus; +import nu.marginalia.crawling.body.ContentTypeLogic; import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; -import nu.marginalia.crawl.retreival.logic.ContentTypeLogic; -import okhttp3.*; +import okhttp3.ConnectionPool; +import okhttp3.Dispatcher; +import okhttp3.OkHttpClient; +import okhttp3.Request; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.net.ssl.SSLException; import javax.net.ssl.X509TrustManager; -import java.io.EOFException; -import java.io.IOException; -import java.net.*; -import java.nio.charset.IllegalCharsetNameException; -import java.time.LocalDateTime; -import java.util.*; +import java.util.List; +import java.util.Objects; +import java.util.Optional; import java.util.concurrent.TimeUnit; @@ -141,9 +137,9 @@ public class HttpFetcherImpl implements HttpFetcher { @Override @SneakyThrows - public CrawledDocument fetchContent(EdgeUrl url, - WarcRecorder warcRecorder, - ContentTags contentTags) + public HttpFetchResult fetchContent(EdgeUrl url, + WarcRecorder warcRecorder, + ContentTags contentTags) throws RateLimitException { @@ -152,23 +148,21 @@ public class HttpFetcherImpl implements HttpFetcher { if (contentTags.isEmpty() && contentTypeLogic.isUrlLikeBinary(url)) { ContentTypeProbeResult probeResult = contentTypeProber.probeContentType(url); - switch (probeResult) { - case ContentTypeProbeResult.Ok(EdgeUrl redirectUrl) -> { - url = redirectUrl; - } - case ContentTypeProbeResult.BadContentType (String contentType, int statusCode) -> { - return createErrorResponse(url, contentType, statusCode, - CrawlerDocumentStatus.BAD_CONTENT_TYPE, - contentType - ); - } - case ContentTypeProbeResult.Timeout timeout -> { - return createTimeoutErrorRsp(url); - } - case ContentTypeProbeResult.Exception ex -> { - return createErrorFromException(url, ex.ex()); - } - }; + if (probeResult instanceof ContentTypeProbeResult.Ok ok) { + url = ok.resolvedUrl(); + } + else if (probeResult instanceof ContentTypeProbeResult.BadContentType badContentType) { + warcRecorder.flagAsFailedContentTypeProbe(url, badContentType.contentType(), badContentType.statusCode()); + return new HttpFetchResult.ResultNone(); + } + else if (probeResult instanceof ContentTypeProbeResult.BadContentType.Timeout timeout) { + warcRecorder.flagAsTimeout(url); + return new HttpFetchResult.ResultNone(); + } + else if (probeResult instanceof ContentTypeProbeResult.Exception exception) { + warcRecorder.flagAsError(url, exception.ex()); + return new HttpFetchResult.ResultNone(); + } } var getBuilder = new Request.Builder().get(); @@ -181,78 +175,20 @@ public class HttpFetcherImpl implements HttpFetcher { HttpFetchResult result = warcRecorder.fetch(client, getBuilder.build()); - if (result instanceof HttpFetchResult.ResultError err) { - return createErrorFromException(url, err.ex()); - } - else if (result instanceof HttpFetchResult.ResultOk ok) { - try { - return extractBody(userAgent, url, ok); + if (result instanceof HttpFetchResult.ResultOk ok) { + if (ok.statusCode() == 429) { + String retryAfter = Objects.requireNonNullElse(ok.header("Retry-After"), "1000"); + throw new RateLimitException(retryAfter); } - catch (Exception ex) { - return createErrorFromException(url, ex); + if (ok.statusCode() == 304) { + return new HttpFetchResult.ResultSame(); + } + if (ok.statusCode() == 200) { + return ok; } } - else { - throw new IllegalStateException(STR."Unknown result type \{result.getClass()}"); - } - } - private CrawledDocument createErrorFromException(EdgeUrl url, Exception exception) throws RateLimitException { - return switch (exception) { - case RateLimitException rle -> throw rle; - case SocketTimeoutException ex -> createTimeoutErrorRsp(url); - case UnknownHostException ex -> createUnknownHostError(url); - case SocketException ex -> createHardErrorRsp(url, ex); - case ProtocolException ex -> createHardErrorRsp(url, ex); - case IllegalCharsetNameException ex -> createHardErrorRsp(url, ex); - case SSLException ex -> createHardErrorRsp(url, ex); - case EOFException ex -> createHardErrorRsp(url, ex); - default -> { - logger.error("Error during fetching", exception); - yield createHardErrorRsp(url, exception); - } - }; - } - - public static CrawledDocument extractBody(String userAgent, EdgeUrl url, HttpFetchResult.ResultOk rsp) throws IOException, RateLimitException { - - var responseUrl = new EdgeUrl(rsp.uri()); - - if (!Objects.equals(responseUrl.domain, url.domain)) { - return createRedirectResponse(url, rsp, responseUrl); - } - - if (rsp.statusCode() == 429) { - String retryAfter = Objects.requireNonNullElse(rsp.header("Retry-After"), "1000"); - - throw new RateLimitException(retryAfter); - } - - if (!isXRobotsTagsPermitted(rsp.allHeaders("X-Robots-Tag"), userAgent)) { - return CrawledDocument.builder() - .crawlerStatus(CrawlerDocumentStatus.ROBOTS_TXT.name()) - .crawlerStatusDesc("X-Robots-Tag") - .url(responseUrl.toString()) - .httpStatus(-1) - .timestamp(LocalDateTime.now().toString()) - .headers(rsp.headers().toString()) - .build(); - } - - return switch(DocumentBodyExtractor.extractBody(rsp)) { - case DocumentBodyResult.Error(CrawlerDocumentStatus status, String why) -> - createErrorResponse(url, rsp, status, why); - case DocumentBodyResult.Ok(String contentType, String body) -> - CrawledDocument.builder() - .crawlerStatus(CrawlerDocumentStatus.OK.name()) - .headers(rsp.headers().toString()) - .contentType(contentType) - .timestamp(LocalDateTime.now().toString()) - .httpStatus(rsp.statusCode()) - .url(responseUrl.toString()) - .documentBody(body) - .build(); - }; + return new HttpFetchResult.ResultNone(); } /** Check X-Robots-Tag header tag to see if we are allowed to index this page. @@ -318,17 +254,31 @@ public class HttpFetcherImpl implements HttpFetcher { private Optional fetchRobotsForProto(String proto, WarcRecorder recorder, EdgeDomain domain) { try { var url = new EdgeUrl(proto, domain, null, "/robots.txt", null); - return Optional.of(parseRobotsTxt(fetchContent(url, recorder, ContentTags.empty()))); + + var getBuilder = new Request.Builder().get(); + + getBuilder.url(url.toString()) + .addHeader("Accept-Encoding", "gzip") + .addHeader("User-agent", userAgent); + + HttpFetchResult result = recorder.fetch(client, getBuilder.build()); + + if (result instanceof HttpFetchResult.ResultOk ok) { + return Optional.of(parseRobotsTxt(ok)); + } + else { + return Optional.empty(); + } } catch (Exception ex) { return Optional.empty(); } } - private SimpleRobotRules parseRobotsTxt(CrawledDocument doc) { - return robotsParser.parseContent(doc.url, - doc.documentBody.getBytes(), - doc.contentType, + private SimpleRobotRules parseRobotsTxt(HttpFetchResult.ResultOk ok) { + return robotsParser.parseContent(ok.uri().toString(), + ok.bytesRaw(), + ok.header("Content-Type"), userAgent); } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyExtractor.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyExtractor.java deleted file mode 100644 index 99ae2cae..00000000 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyExtractor.java +++ /dev/null @@ -1,44 +0,0 @@ -package nu.marginalia.crawl.retreival.fetcher.body; - -import nu.marginalia.contenttype.ContentTypeParser; -import nu.marginalia.contenttype.DocumentBodyToString; -import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult; -import nu.marginalia.crawl.retreival.logic.ContentTypeLogic; -import nu.marginalia.crawling.model.CrawlerDocumentStatus; -import org.apache.commons.io.input.BOMInputStream; - -import java.io.IOException; -import java.util.zip.GZIPInputStream; - -public class DocumentBodyExtractor { - private static ContentTypeLogic contentTypeLogic = new ContentTypeLogic(); - - public static DocumentBodyResult extractBody(HttpFetchResult.ResultOk rsp) throws IOException { - var byteStream = rsp.getInputStream(); - - if ("gzip".equals(rsp.header("Content-Encoding"))) { - byteStream = new GZIPInputStream(byteStream); - } - byteStream = new BOMInputStream(byteStream); - - var contentTypeHeader = rsp.header("Content-Type"); - if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) { - return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); - } - - byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder - - var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data); - if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) { - return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); - } - - if ("Shift_JIS".equalsIgnoreCase(contentType.charset())) { - return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CHARSET, ""); - } - - - return new DocumentBodyResult.Ok(contentType.contentType(), DocumentBodyToString.getStringData(contentType, data)); - } - -} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyResult.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyResult.java deleted file mode 100644 index fc5d67ec..00000000 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/body/DocumentBodyResult.java +++ /dev/null @@ -1,8 +0,0 @@ -package nu.marginalia.crawl.retreival.fetcher.body; - -import nu.marginalia.crawling.model.CrawlerDocumentStatus; - -public sealed interface DocumentBodyResult { - record Ok(String contentType, String body) implements DocumentBodyResult { } - record Error(CrawlerDocumentStatus status, String why) implements DocumentBodyResult { } -} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java index 3d4b5aaa..b7bb82bd 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java @@ -1,6 +1,9 @@ package nu.marginalia.crawl.retreival.fetcher.warc; +import nu.marginalia.crawl.retreival.DomainProber; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor; +import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -8,7 +11,6 @@ import org.netpreserve.jwarc.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.net.InetAddress; @@ -18,9 +20,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.security.NoSuchAlgorithmException; import java.time.Instant; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; /** Based on JWarc's fetch method, APL 2.0 license *

@@ -29,7 +29,12 @@ import java.util.Map; * be reconstructed. */ public class WarcRecorder implements AutoCloseable { - public static final URI revisitURI = URI.create("urn:marginalia:revisit"); + public static final URI documentRevisitURN = URI.create("urn:marginalia/data/doc/revisit"); + + public static final URI documentRobotsTxtSkippedURN = URI.create("urn:marginalia/meta/doc/robots-txt-skipped"); + public static final URI documentBadContentTypeURN = URI.create("urn:marginalia/meta/doc/content-type-failed-probe"); + public static final URI documentProbeTimeout = URI.create("urn:marginalia/meta/doc/timeout-probe"); + public static final URI documentUnspecifiedError = URI.create("urn:marginalia/meta/doc/error"); private static final int MAX_TIME = 30_000; private static final int MAX_SIZE = 1024 * 1024 * 10; @@ -37,10 +42,14 @@ public class WarcRecorder implements AutoCloseable { private final Path warcFile; private static final Logger logger = LoggerFactory.getLogger(WarcRecorder.class); - private ThreadLocal bufferThreadLocal = ThreadLocal.withInitial(() -> new byte[MAX_SIZE]); + private final ThreadLocal bufferThreadLocal = ThreadLocal.withInitial(() -> new byte[MAX_SIZE]); private boolean temporaryFile = false; + // Affix a version string in case we need to change the format in the future + // in some way + private final String warcRecorderVersion = "1.0"; + /** * Create a new WarcRecorder that will write to the given file * @@ -48,7 +57,7 @@ public class WarcRecorder implements AutoCloseable { */ public WarcRecorder(Path warcFile) throws IOException { this.warcFile = warcFile; - this.writer = new WarcWriter(this.warcFile); + this.writer = new WarcWriter(warcFile); } /** @@ -170,7 +179,7 @@ public class WarcRecorder implements AutoCloseable { } catch (Exception ex) { logger.warn("Failed to fetch URL {}", uri, ex); - return new HttpFetchResult.ResultError(ex); + return new HttpFetchResult.ResultException(ex); } } @@ -178,55 +187,141 @@ public class WarcRecorder implements AutoCloseable { writer.write(item); } - /** - * Flag the given URL as skipped by the crawler, so that it will not be retried. - * Which URLs were skipped is still important when resynchronizing on the WARC file, - * so that the crawler can avoid re-fetching them. - * - * @param url The URL to flag - * @param headers - * @param documentBody - */ - public void flagAsSkipped(EdgeUrl url, String headers, int statusCode, String documentBody) { + private void saveOldResponse(EdgeUrl url, String contentType, int statusCode, String documentBody) { try { WarcDigestBuilder responseDigestBuilder = new WarcDigestBuilder(); WarcDigestBuilder payloadDigestBuilder = new WarcDigestBuilder(); - String header = WarcProtocolReconstructor.getResponseHeader(headers, statusCode); + byte[] bytes = documentBody.getBytes(); + + String fakeHeaders = STR.""" + Content-Type: \{contentType} + Content-Length: \{bytes.length} + Content-Encoding: UTF-8 + """; + + String header = WarcProtocolReconstructor.getResponseHeader(fakeHeaders, statusCode); ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer(); responseDataBuffer.put(header); responseDigestBuilder.update(header); - try (var inputStream = new ByteArrayInputStream(documentBody.getBytes())) { - int remainingLength; - while ((remainingLength = responseDataBuffer.remaining()) > 0) { - int startPos = responseDataBuffer.pos(); + responseDigestBuilder.update(bytes, bytes.length); + payloadDigestBuilder.update(bytes, bytes.length); + responseDataBuffer.put(bytes, 0, bytes.length); - int n = responseDataBuffer.readFrom(inputStream, remainingLength); - if (n < 0) - break; - - responseDataBuffer.updateDigest(responseDigestBuilder, startPos, n); - responseDataBuffer.updateDigest(payloadDigestBuilder, startPos, n); - } - } - - WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), revisitURI) + WarcXResponseReference reference = new WarcXResponseReference.Builder(url.asURI()) .blockDigest(responseDigestBuilder.build()) .payloadDigest(payloadDigestBuilder.build()) .date(Instant.now()) .body(MediaType.HTTP_RESPONSE, responseDataBuffer.copyBytes()) .build(); - revisit.http(); // force HTTP header to be parsed before body is consumed so that caller can use it + reference.http(); // force HTTP header to be parsed before body is consumed so that caller can use it - writer.write(revisit); + writer.write(reference); } catch (URISyntaxException | IOException | NoSuchAlgorithmException e) { throw new RuntimeException(e); } } + /** + * Flag the given URL as skipped by the crawler, so that it will not be retried. + * Which URLs were skipped is still important when resynchronizing on the WARC file, + * so that the crawler can avoid re-fetching them. + */ + public void flagAsSkipped(EdgeUrl url, String contentType, int statusCode, String documentBody) { + saveOldResponse(url, contentType, statusCode, documentBody); + } + + /** + * Write a reference copy of the given document data. This is used when the crawler provides + * an E-Tag or Last-Modified header, and the server responds with a 304 Not Modified. In this + * scenario we want to record the data as it was in the previous crawl, but not re-fetch it. + */ + public void writeReferenceCopy(EdgeUrl url, String contentType, int statusCode, String documentBody) { + saveOldResponse(url, contentType, statusCode, documentBody); + } + + public void writeWarcinfoHeader(String ip, EdgeDomain domain, DomainProber.ProbeResult result) throws IOException { + + Map> fields = new HashMap<>(); + fields.put("ip", List.of(ip)); + fields.put("software", List.of(STR."search.marginalia.nu/\{warcRecorderVersion}")); + fields.put("domain", List.of(domain.toString())); + + switch (result) { + case DomainProber.ProbeResultRedirect redirectDomain: + fields.put("X-WARC-Probe-Status", List.of(STR."REDIRECT;\{redirectDomain.domain()}")); + break; + case DomainProber.ProbeResultError error: + fields.put("X-WARC-Probe-Status", List.of(STR."\{error.status().toString()};\{error.desc()}")); + break; + case DomainProber.ProbeResultOk ok: + fields.put("X-WARC-Probe-Status", List.of("OK")); + break; + } + + var warcinfo = new Warcinfo.Builder() + .date(Instant.now()) + .fields(fields) + .recordId(UUID.randomUUID()) + .build(); + + writer.write(warcinfo); + } + + public void flagAsRobotsTxtError(EdgeUrl top) { + try { + WarcRevisit revisit = new WarcRevisit.Builder(top.asURI(), documentRobotsTxtSkippedURN) + .date(Instant.now()) + .build(); + + writer.write(revisit); + } catch (URISyntaxException | IOException e) { + throw new RuntimeException(e); + } + } + + public void flagAsFailedContentTypeProbe(EdgeUrl url, String contentType, int status) { + try { + WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), documentBadContentTypeURN) + .date(Instant.now()) + .addHeader("Rejected-Content-Type", contentType) + .addHeader("Http-Status", Integer.toString(status)) + .build(); + + writer.write(revisit); + } catch (URISyntaxException | IOException e) { + throw new RuntimeException(e); + } + } + + public void flagAsError(EdgeUrl url, Exception ex) { + try { + WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), documentUnspecifiedError) + .date(Instant.now()) + .addHeader("Exception", ex.getClass().getSimpleName()) + .addHeader("ErrorMessage", Objects.requireNonNullElse(ex.getMessage(), "")) + .build(); + + writer.write(revisit); + } catch (URISyntaxException | IOException e) { + throw new RuntimeException(e); + } + } + + public void flagAsTimeout(EdgeUrl url) { + try { + WarcRevisit revisit = new WarcRevisit.Builder(url.asURI(), documentProbeTimeout) + .date(Instant.now()) + .build(); + + writer.write(revisit); + } catch (URISyntaxException | IOException e) { + throw new RuntimeException(e); + } + } private class ResponseDataBuffer { private final byte[] data; diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java index c77af845..70a98310 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/CrawlerRevisitor.java @@ -5,15 +5,11 @@ import nu.marginalia.crawl.retreival.CrawlDataReference; import nu.marginalia.crawl.retreival.CrawlDelayTimer; import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.DomainCrawlFrontier; -import nu.marginalia.crawl.retreival.CrawledDocumentFactory; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; import nu.marginalia.crawling.model.CrawledDocument; -import nu.marginalia.crawling.model.SerializableCrawlData; import nu.marginalia.model.EdgeUrl; import org.jsoup.Jsoup; -import java.util.function.Consumer; - /** This class encapsulates the logic for re-visiting a domain that has already been crawled. * We may use information from the previous crawl to inform the next crawl, specifically the * E-Tag and Last-Modified headers. @@ -27,16 +23,13 @@ public class CrawlerRevisitor { private final DomainCrawlFrontier crawlFrontier; - private final Consumer crawledDomainWriter; private final CrawlerRetreiver crawlerRetreiver; private final WarcRecorder warcRecorder; public CrawlerRevisitor(DomainCrawlFrontier crawlFrontier, - Consumer crawledDomainWriter, CrawlerRetreiver crawlerRetreiver, WarcRecorder warcRecorder) { this.crawlFrontier = crawlFrontier; - this.crawledDomainWriter = crawledDomainWriter; this.crawlerRetreiver = crawlerRetreiver; this.warcRecorder = warcRecorder; } @@ -69,7 +62,7 @@ public class CrawlerRevisitor { if (doc.httpStatus != 200) continue; if (!robotsRules.isAllowed(url.toString())) { - crawledDomainWriter.accept(CrawledDocumentFactory.createRobotsError(url)); + warcRecorder.flagAsRobotsTxtError(url); continue; } if (!crawlFrontier.filterLink(url)) @@ -87,7 +80,6 @@ public class CrawlerRevisitor { // fashion to make sure we eventually catch changes over time // and ensure we discover new links - crawledDomainWriter.accept(doc); crawlFrontier.addVisited(url); // Hoover up any links from the document @@ -97,7 +89,7 @@ public class CrawlerRevisitor { } // Add a WARC record so we don't repeat this - warcRecorder.flagAsSkipped(url, doc.headers, doc.httpStatus, doc.documentBody); + warcRecorder.flagAsSkipped(url, doc.contentType, doc.httpStatus, doc.documentBody); continue; } @@ -107,15 +99,14 @@ public class CrawlerRevisitor { // providing etag and last-modified headers, so we can recycle the // document if it hasn't changed without actually downloading it - var fetchedDocOpt = crawlerRetreiver.fetchWriteAndSleep(url, - delayTimer, - new DocumentWithReference(doc, oldCrawlData)); - if (fetchedDocOpt.isEmpty()) continue; + var reference = new DocumentWithReference(doc, oldCrawlData); + var result = crawlerRetreiver.fetchWriteAndSleep(url, delayTimer, reference); - if (documentWasRetainedTag.equals(fetchedDocOpt.get().recrawlState)) retained ++; - else if (documentWasSameTag.equals(fetchedDocOpt.get().recrawlState)) retained ++; + if (reference.isSame(result)) { + retained++; + } - recrawled ++; + recrawled++; } return recrawled; diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java index e832541f..03b96760 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java @@ -1,12 +1,15 @@ package nu.marginalia.crawl.retreival.revisit; +import lombok.SneakyThrows; import nu.marginalia.crawl.retreival.CrawlDataReference; -import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.fetcher.ContentTags; +import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; +import nu.marginalia.crawling.body.DocumentBodyExtractor; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawling.model.CrawledDocument; +import nu.marginalia.model.EdgeUrl; import javax.annotation.Nullable; -import java.time.LocalDateTime; public record DocumentWithReference( @Nullable CrawledDocument doc, @@ -18,17 +21,28 @@ public record DocumentWithReference( return emptyInstance; } - public boolean isContentBodySame(CrawledDocument newDoc) { + /** Returns true if the provided document is the same as the reference document, + * or if the result was retained via HTTP 304. + */ + public boolean isSame(HttpFetchResult result) { + if (result instanceof HttpFetchResult.ResultSame) + return true; + if (result instanceof HttpFetchResult.ResultRetained) + return true; + + if (!(result instanceof HttpFetchResult.ResultOk resultOk)) + return false; + if (reference == null) return false; if (doc == null) return false; if (doc.documentBody == null) return false; - if (newDoc.documentBody == null) - return false; - return reference.isContentBodySame(doc, newDoc); + return DocumentBodyExtractor.extractBody(resultOk) + .map((contentType, body) -> reference.isContentBodySame(doc.documentBody, body)) + .orElse(false); } public ContentTags getContentTags() { @@ -60,23 +74,4 @@ public record DocumentWithReference( return doc == null || reference == null; } - /** - * If the provided document has HTTP status 304, and the reference document is provided, - * return the reference document; otherwise return the provided document. - */ - public CrawledDocument replaceOn304(CrawledDocument fetchedDoc) { - - if (doc == null) - return fetchedDoc; - - // HTTP status 304 is NOT MODIFIED, which means the document is the same as it was when - // we fetched it last time. We can recycle the reference document. - if (fetchedDoc.httpStatus != 304) - return fetchedDoc; - - var ret = doc; - ret.recrawlState = CrawlerRevisitor.documentWasRetainedTag; - ret.timestamp = LocalDateTime.now().toString(); - return ret; - } } diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java index 55f2eebe..4faa2042 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java @@ -8,9 +8,7 @@ import okhttp3.Request; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.netpreserve.jwarc.WarcReader; -import org.netpreserve.jwarc.WarcRequest; -import org.netpreserve.jwarc.WarcResponse; +import org.netpreserve.jwarc.*; import java.io.IOException; import java.net.URISyntaxException; @@ -22,6 +20,7 @@ import java.util.Map; import java.util.zip.GZIPInputStream; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; class WarcRecorderTest { Path fileName; @@ -33,7 +32,7 @@ class WarcRecorderTest { .addNetworkInterceptor(new IpInterceptingNetworkInterceptor()) .build(); - fileName = Files.createTempFile("test", ".warc.gz"); + fileName = Files.createTempFile("test", ".warc"); client = new WarcRecorder(fileName); } @@ -73,10 +72,7 @@ class WarcRecorderTest { try (var recorder = new WarcRecorder(fileName)) { recorder.flagAsSkipped(new EdgeUrl("https://www.marginalia.nu/"), - """ - Content-type: text/html - X-Cookies: 1 - """, + "text/html", 200, "test"); } @@ -95,5 +91,27 @@ class WarcRecorderTest { new GZIPInputStream(Files.newInputStream(fileName)).transferTo(System.out); } + @Test + public void testSaveImport() throws URISyntaxException, IOException { + try (var recorder = new WarcRecorder(fileName)) { + recorder.flagAsSkipped(new EdgeUrl("https://www.marginalia.nu/"), + "text/html", + 200, + "test"); + } + + try (var reader = new WarcReader(fileName)) { + WarcXResponseReference.register(reader); + + for (var record : reader) { + System.out.println(record.type()); + System.out.println(record.getClass().getSimpleName()); + if (record instanceof WarcXResponseReference rsp) { + assertEquals("https://www.marginalia.nu/", rsp.target()); + } + } + } + + } } \ No newline at end of file diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java index 2f3076cd..4590dde2 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java @@ -4,8 +4,10 @@ import lombok.SneakyThrows; import nu.marginalia.crawl.retreival.RateLimitException; import nu.marginalia.crawl.retreival.fetcher.ContentTags; import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl; +import nu.marginalia.crawling.body.DocumentBodyExtractor; +import nu.marginalia.crawling.body.DocumentBodyResult; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; -import nu.marginalia.crawl.retreival.logic.ContentTypeLogic; +import nu.marginalia.crawling.body.ContentTypeLogic; import nu.marginalia.model.EdgeUrl; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -33,10 +35,11 @@ class HttpFetcherTest { void fetchUTF8() throws URISyntaxException, RateLimitException, IOException { var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler"); try (var recorder = new WarcRecorder()) { - var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), recorder, ContentTags.empty()); - System.out.println(str.contentType); + var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), recorder, ContentTags.empty()); + if (DocumentBodyExtractor.extractBody(result) instanceof DocumentBodyResult.Ok bodyOk) { + System.out.println(bodyOk.contentType()); + } } - } @Test @@ -44,8 +47,10 @@ class HttpFetcherTest { var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler"); try (var recorder = new WarcRecorder()) { - var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), recorder, ContentTags.empty()); - System.out.println(str.contentType); + var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), recorder, ContentTags.empty()); + if (DocumentBodyExtractor.extractBody(result) instanceof DocumentBodyResult.Ok bodyOk) { + System.out.println(bodyOk.contentType()); + } } } } \ No newline at end of file diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java index 2a00e6de..b7727022 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java @@ -5,6 +5,7 @@ import lombok.SneakyThrows; import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.crawl.retreival.DomainProber; import nu.marginalia.crawl.retreival.fetcher.*; +import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; import nu.marginalia.crawling.model.CrawledDocument; import nu.marginalia.crawling.model.CrawlerDocumentStatus; @@ -13,6 +14,7 @@ import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.crawlspec.CrawlSpecRecord; import nu.marginalia.test.CommonTestData; +import okhttp3.Headers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.mockito.Mockito; @@ -21,12 +23,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.net.URISyntaxException; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.function.Consumer; +import java.util.*; public class CrawlerMockFetcherTest { @@ -65,9 +62,9 @@ public class CrawlerMockFetcherTest { } - void crawl(CrawlSpecRecord spec, Consumer consumer) throws IOException { + void crawl(CrawlSpecRecord spec) throws IOException { try (var recorder = new WarcRecorder()) { - new CrawlerRetreiver(fetcherMock, new DomainProber(d -> true), spec, recorder, consumer) + new CrawlerRetreiver(fetcherMock, new DomainProber(d -> true), spec, recorder) .fetch(); } } @@ -80,9 +77,7 @@ public class CrawlerMockFetcherTest { registerUrlClasspathData(new EdgeUrl("https://startrek.website/c/startrek"), "mock-crawl-data/lemmy/c_startrek.html"); registerUrlClasspathData(new EdgeUrl("https://startrek.website/post/108995"), "mock-crawl-data/lemmy/108995.html"); - crawl(new CrawlSpecRecord("startrek.website", 10, new ArrayList<>()), out::add); - - out.forEach(System.out::println); + crawl(new CrawlSpecRecord("startrek.website", 10, new ArrayList<>())); } @Test @@ -91,9 +86,7 @@ public class CrawlerMockFetcherTest { registerUrlClasspathData(new EdgeUrl("https://en.wikipedia.org/"), "mock-crawl-data/mediawiki/index.html"); - crawl(new CrawlSpecRecord("en.wikipedia.org", 10, new ArrayList<>()), out::add); - - out.forEach(System.out::println); + crawl(new CrawlSpecRecord("en.wikipedia.org", 10, new ArrayList<>())); } @Test @@ -104,9 +97,7 @@ public class CrawlerMockFetcherTest { registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/telegram-channel-to-idle-on/3501"), "mock-crawl-data/discourse/telegram.html"); registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/combined-mode-but-grid/4489"), "mock-crawl-data/discourse/grid.html"); - crawl(new CrawlSpecRecord("community.tt-rss.org", 10, new ArrayList<>()), out::add); - - out.forEach(System.out::println); + crawl(new CrawlSpecRecord("community.tt-rss.org", 10, new ArrayList<>())); } class MockFetcher implements HttpFetcher { @@ -126,21 +117,23 @@ public class CrawlerMockFetcherTest { return new FetchResult(FetchResultState.OK, url); } + @SneakyThrows @Override - public CrawledDocument fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags) { + public HttpFetchResult fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags) { logger.info("Fetching {}", url); if (mockData.containsKey(url)) { - return mockData.get(url); - } - else { - return CrawledDocument.builder() - .crawlId("1") - .url(url.toString()) - .contentType("text/html") - .httpStatus(404) - .crawlerStatus(CrawlerDocumentStatus.ERROR.name()) - .build(); + byte[] bodyBytes = mockData.get(url).documentBody.getBytes(); + return new HttpFetchResult.ResultOk( + url.asURI(), + 200, + new Headers.Builder().build(), + bodyBytes, + 0, + bodyBytes.length + ); } + + return new HttpFetchResult.ResultNone(); } @Override diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java index 59bf99f6..286f15f5 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerRetreiverTest.java @@ -16,15 +16,14 @@ import nu.marginalia.crawling.model.CrawledDomain; import nu.marginalia.crawling.model.SerializableCrawlData; import nu.marginalia.model.crawlspec.CrawlSpecRecord; import org.junit.jupiter.api.*; -import org.netpreserve.jwarc.WarcReader; -import org.netpreserve.jwarc.WarcRequest; -import org.netpreserve.jwarc.WarcResponse; +import org.netpreserve.jwarc.*; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.util.*; import java.util.stream.Collectors; +import java.util.zip.GZIPInputStream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -33,6 +32,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue; class CrawlerRetreiverTest { private HttpFetcher httpFetcher; + Path tempFile; + Path tempFile2; @BeforeEach public void setUp() { httpFetcher = new HttpFetcherImpl("search.marginalia.nu; testing a bit :D"); @@ -45,6 +46,15 @@ class CrawlerRetreiverTest { System.setProperty("http.agent", WmsaHome.getUserAgent().uaString()); } + @AfterEach + public void tearDown() throws IOException { + if (tempFile != null) { + Files.deleteIfExists(tempFile); + } + if (tempFile2 != null) { + Files.deleteIfExists(tempFile2); + } + } @Test public void testWarcOutput() throws IOException { var specs = CrawlSpecRecord @@ -57,10 +67,8 @@ class CrawlerRetreiverTest { try { tempFile = Files.createTempFile("crawling-process", "warc"); - List data = new ArrayList<>(); - try (var recorder = new WarcRecorder(tempFile)) { - new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, data::add).fetch(); + new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(); } catch (IOException ex) { Assertions.fail(ex); } @@ -93,7 +101,7 @@ class CrawlerRetreiverTest { } } @Test - public void testWithKnownDomains() { + public void testWithKnownDomains() throws IOException { var specs = CrawlSpecRecord .builder() .crawlDepth(5) @@ -103,15 +111,30 @@ class CrawlerRetreiverTest { List data = new ArrayList<>(); - try (var recorder = new WarcRecorder()) { - new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, data::add).fetch(); + tempFile = Files.createTempFile("crawling-process", ".warc"); + + try (var recorder = new WarcRecorder(tempFile)) { + new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(); } catch (IOException ex) { Assertions.fail(ex); } + + try (var stream = CrawledDomainReader.createDataStream(tempFile)) { + while (stream.hasNext()) { + if (stream.next() instanceof CrawledDocument doc) { + data.add(doc); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + var fetchedUrls = - data.stream().filter(CrawledDocument.class::isInstance) + data.stream() + .peek(System.out::println) + .filter(CrawledDocument.class::isInstance) .map(CrawledDocument.class::cast) .map(doc -> doc.url) .collect(Collectors.toSet()); @@ -126,7 +149,7 @@ class CrawlerRetreiverTest { } @Test - public void testEmptySet() { + public void testEmptySet() throws IOException { var specs = CrawlSpecRecord .builder() @@ -135,15 +158,30 @@ class CrawlerRetreiverTest { .urls(List.of()) .build(); + List data = new ArrayList<>(); - try (var recorder = new WarcRecorder()) { - new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, data::add).fetch(); + tempFile = Files.createTempFile("crawling-process", ".warc"); + + try (var recorder = new WarcRecorder(tempFile)) { + new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(); } catch (IOException ex) { Assertions.fail(ex); } + + try (var stream = CrawledDomainReader.createDataStream(tempFile)) { + while (stream.hasNext()) { + if (stream.next() instanceof CrawledDocument doc) { + data.add(doc); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + + data.stream().filter(CrawledDocument.class::isInstance) .map(CrawledDocument.class::cast) .forEach(doc -> System.out.println(doc.url + "\t" + doc.crawlerStatus + "\t" + doc.httpStatus)); @@ -174,43 +212,70 @@ class CrawlerRetreiverTest { .build(); - Path out = Files.createTempDirectory("crawling-process"); - var writer = new CrawledDomainWriter(out, specs.domain, "idid"); + tempFile = Files.createTempFile("crawling-process", ".warc.gz"); + tempFile2 = Files.createTempFile("crawling-process", ".warc.gz"); + Map, List> data = new HashMap<>(); - try (var recorder = new WarcRecorder()) { - new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, d -> { - data.computeIfAbsent(d.getClass(), k->new ArrayList<>()).add(d); - if (d instanceof CrawledDocument doc) { - System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus); - if (Math.random() > 0.5) { - doc.headers = ""; - } - } - writer.accept(d); - }).fetch(); + try (var recorder = new WarcRecorder(tempFile)) { + new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(); } catch (IOException ex) { Assertions.fail(ex); } - - writer.close(); - - var reader = new CrawledDomainReader(); - var stream = reader.createDataStream(out, specs.domain, "idid"); + try (var stream = CrawledDomainReader.createDataStream(tempFile)) { + while (stream.hasNext()) { + var doc = stream.next(); + data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc); + } + } catch (Exception e) { + throw new RuntimeException(e); + } + var stream = CrawledDomainReader.createDataStream(tempFile); CrawledDomain domain = (CrawledDomain) data.get(CrawledDomain.class).get(0); domain.doc = data.get(CrawledDocument.class).stream().map(CrawledDocument.class::cast).collect(Collectors.toList()); - try (var recorder = new WarcRecorder()) { - new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder, d -> { - if (d instanceof CrawledDocument doc) { - System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus); - } - }).fetch(new DomainLinks(), new CrawlDataReference(stream)); + try (var recorder = new WarcRecorder(tempFile2)) { + new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch(new DomainLinks(), + new CrawlDataReference(stream)); } catch (IOException ex) { Assertions.fail(ex); } + + new GZIPInputStream(Files.newInputStream(tempFile2)).transferTo(System.out); + + try (var reader = new WarcReader(tempFile2)) { + WarcXResponseReference.register(reader); + + reader.forEach(record -> { + if (record instanceof WarcResponse rsp) { + try { + System.out.println(rsp.type() + ":" + rsp.target() + "/" + rsp.http().status()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + if (record instanceof WarcMetadata rsp) { + System.out.println("meta:" + rsp.target()); + } + }); + } + + try (var ds = CrawledDomainReader.createDataStream(tempFile2)) { + while (ds.hasNext()) { + var doc = ds.next(); + if (doc instanceof CrawledDomain dr) { + System.out.println(dr.domain + "/" + dr.crawlerStatus); + } + else if (doc instanceof CrawledDocument dc) { + System.out.println(dc.url + "/" + dc.crawlerStatus + "/" + dc.httpStatus); + } + } + } catch (Exception e) { + throw new RuntimeException(e); + + } } } \ No newline at end of file diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ExportAtagsActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ExportAtagsActor.java index 0af77acb..353ef965 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ExportAtagsActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ExportAtagsActor.java @@ -63,8 +63,6 @@ public class ExportAtagsActor extends RecordActorPrototype { Path inputDir = storageService.getStorage(crawlId).asPath(); - var reader = new CrawledDomainReader(); - try (var bw = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(Files.newOutputStream(tmpFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING)))); ) { @@ -78,7 +76,7 @@ public class ExportAtagsActor extends RecordActorPrototype { } Path crawlDataPath = inputDir.resolve(item.relPath()); - try (var stream = reader.createDataStream(crawlDataPath)) { + try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { exportLinks(tagWriter, stream); } catch (Exception ex) { diff --git a/code/tools/crawl-data-unfcker/src/main/java/nu/marginalia/tools/CrawlDataUnfcker.java b/code/tools/crawl-data-unfcker/src/main/java/nu/marginalia/tools/CrawlDataUnfcker.java index 1a73a952..4322d3fc 100644 --- a/code/tools/crawl-data-unfcker/src/main/java/nu/marginalia/tools/CrawlDataUnfcker.java +++ b/code/tools/crawl-data-unfcker/src/main/java/nu/marginalia/tools/CrawlDataUnfcker.java @@ -29,13 +29,11 @@ public class CrawlDataUnfcker { return; } - var reader = new CrawledDomainReader(); - try (var wl = new WorkLog(output.resolve("crawler.log"))) { for (var inputItem : WorkLog.iterable(input.resolve("crawler.log"))) { Path inputPath = input.resolve(inputItem.relPath()); - var domainMaybe = readDomain(reader, inputPath).map(CrawledDomain::getDomain); + var domainMaybe = readDomain(inputPath).map(CrawledDomain::getDomain); if (domainMaybe.isEmpty()) continue; var domain = domainMaybe.get(); @@ -43,7 +41,7 @@ public class CrawlDataUnfcker { // Generate conformant ID String newId = Integer.toHexString(domain.hashCode()); - var outputPath = CrawlerOutputFile.createOutputPath(output, newId, domain); + var outputPath = CrawlerOutputFile.createLegacyOutputPath(output, newId, domain); var outputFileName = outputPath.toFile().getName(); System.out.println(inputPath + " -> " + outputPath); @@ -56,13 +54,13 @@ public class CrawlDataUnfcker { } } - static Optional readDomain(CrawledDomainReader reader, Path file) { + static Optional readDomain(Path file) { if (!Files.exists(file)) { System.out.println("Missing file " + file); return Optional.empty(); } - try (var stream = reader.createDataStream(file)) { + try (var stream = CrawledDomainReader.createDataStream(file)) { while (stream.hasNext()) { if (stream.next() instanceof CrawledDomain domain) { return Optional.of(domain); diff --git a/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/ExperimentRunnerMain.java b/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/ExperimentRunnerMain.java index 97df4a39..c5751a7a 100644 --- a/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/ExperimentRunnerMain.java +++ b/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/ExperimentRunnerMain.java @@ -50,10 +50,9 @@ public class ExperimentRunnerMain { experiment.args(Arrays.copyOfRange(args, 2, args.length)); Path basePath = Path.of(args[0]); - var reader = new CrawledDomainReader(); for (var item : WorkLog.iterable(basePath.resolve("crawler.log"))) { Path crawlDataPath = basePath.resolve(item.relPath()); - try (var stream = reader.createDataStream(crawlDataPath)) { + try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) { experiment.process(stream); } catch (Exception ex) { diff --git a/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/LegacyExperiment.java b/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/LegacyExperiment.java index 4e61ffc4..5d7d8d11 100644 --- a/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/LegacyExperiment.java +++ b/code/tools/experiment-runner/src/main/java/nu/marginalia/tools/LegacyExperiment.java @@ -5,12 +5,12 @@ import nu.marginalia.crawling.model.CrawledDocument; import nu.marginalia.crawling.model.CrawledDomain; import java.io.IOException; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; public abstract class LegacyExperiment extends Experiment { public abstract boolean process(CrawledDomain domain); + @Override public boolean process(SerializableCrawlDataStream dataStream) throws IOException { List documentList = new ArrayList<>(); diff --git a/settings.gradle b/settings.gradle index af8a45f5..42ae0f47 100644 --- a/settings.gradle +++ b/settings.gradle @@ -155,7 +155,7 @@ dependencyResolutionManagement { library('duckdb', 'org.duckdb', 'duckdb_jdbc').version('0.9.1') library('okhttp3','com.squareup.okhttp3','okhttp').version('4.11.0') - library('jwarc', 'org.netpreserve', 'jwarc').version('0.28.4') + library('jwarc', 'org.netpreserve', 'jwarc').version('0.28.5') library('httpcomponents.core','org.apache.httpcomponents','httpcore').version('4.4.15') library('httpcomponents.client','org.apache.httpcomponents','httpclient').version('4.5.13')