From 1328bc4938b63e9910d2bf9d1eae086ad7811ad5 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Thu, 14 Dec 2023 16:05:48 +0100 Subject: [PATCH] (warc) Clean up parquet conversion This commit cleans up the warc->parquet conversion. Records with a http status other than 200 are now included. The commit also fixes a bug where the robots.txt parser would be fed the full HTTP response (and choke), instead of the body. The DocumentBodyExtractor code has also been cleaned up, and now offers a way of just getting the byte[] representation for later processing, as conversion to and from strings is a bit wasteful. --- .../crawling/body/DocumentBodyExtractor.java | 58 ++++++++++++---- .../crawling/body/DocumentBodyResult.java | 30 +++++++-- .../crawling/body/HttpFetchResult.java | 66 +++++++++---------- .../crawling/io/CrawlerOutputFile.java | 40 +++++++---- ...arcReadingSerializableCrawlDataStream.java | 7 +- .../parquet/CrawledDocumentParquetRecord.java | 4 ++ ...rawledDocumentParquetRecordFileWriter.java | 61 +++++++++++++++++ ...edDocumentParquetRecordFileWriterTest.java | 3 + .../java/nu/marginalia/crawl/CrawlerMain.java | 6 +- .../crawl/retreival/CrawlerRetreiver.java | 2 +- .../retreival/CrawlerWarcResynchronizer.java | 33 +++------- .../retreival/fetcher/HttpFetcherImpl.java | 21 +++--- .../warc/WarcProtocolReconstructor.java | 5 +- .../retreival/fetcher/warc/WarcRecorder.java | 3 +- .../revisit/DocumentWithReference.java | 12 ++-- .../retreival/fetcher/WarcRecorderTest.java | 54 +++++++++++---- .../marginalia/crawling/HttpFetcherTest.java | 4 +- .../retreival/CrawlerMockFetcherTest.java | 1 + 18 files changed, 278 insertions(+), 132 deletions(-) diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java index 7bb548e5..00ceac86 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java @@ -11,20 +11,54 @@ import java.io.IOException; import java.util.zip.GZIPInputStream; public class DocumentBodyExtractor { - private static ContentTypeLogic contentTypeLogic = new ContentTypeLogic(); + private static final ContentTypeLogic contentTypeLogic = new ContentTypeLogic(); private static final Logger logger = LoggerFactory.getLogger(DocumentBodyExtractor.class); - public static DocumentBodyResult extractBody(HttpFetchResult result) { - if (result instanceof HttpFetchResult.ResultOk fetchOk) { - return extractBody(fetchOk); + public static DocumentBodyResult asString(HttpFetchResult result) { + if (result instanceof HttpFetchResult.ResultOk ok) { + return asString(ok); } - else { - return new DocumentBodyResult.Error(CrawlerDocumentStatus.ERROR, ""); + else if (result instanceof HttpFetchResult.ResultRetained retained) { + return new DocumentBodyResult.Ok<>(retained.contentType(), retained.body()); + } + + return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.ERROR, "Fetch Result Not Ok"); + } + + public static DocumentBodyResult asBytes(HttpFetchResult result) { + if (result instanceof HttpFetchResult.ResultOk fetchOk) { + return asBytes(fetchOk); + } + else if (result instanceof HttpFetchResult.ResultRetained retained) { + return new DocumentBodyResult.Ok<>(retained.contentType(), retained.body().getBytes()); + } + + return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.ERROR, "Fetch Result Not Ok"); + } + + public static DocumentBodyResult asBytes(HttpFetchResult.ResultOk rsp) { + try { + var byteStream = rsp.getInputStream(); + + if ("gzip".equals(rsp.header("Content-Encoding"))) { + byteStream = new GZIPInputStream(byteStream); + } + byteStream = new BOMInputStream(byteStream); + + var contentTypeHeader = rsp.header("Content-Type"); + + byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder + var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data); + + return new DocumentBodyResult.Ok<>(contentType.contentType(), data); + } catch (Exception ex) { + logger.error("Failed to extract body", ex); + return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.ERROR, ""); } } - public static DocumentBodyResult extractBody(HttpFetchResult.ResultOk rsp) { + public static DocumentBodyResult asString(HttpFetchResult.ResultOk rsp) { try { var byteStream = rsp.getInputStream(); @@ -35,25 +69,25 @@ public class DocumentBodyExtractor { var contentTypeHeader = rsp.header("Content-Type"); if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) { - return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); + return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); } byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data); if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) { - return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); + return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.BAD_CONTENT_TYPE, ""); } if ("Shift_JIS".equalsIgnoreCase(contentType.charset())) { - return new DocumentBodyResult.Error(CrawlerDocumentStatus.BAD_CHARSET, ""); + return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.BAD_CHARSET, ""); } - return new DocumentBodyResult.Ok(contentType.contentType(), DocumentBodyToString.getStringData(contentType, data)); + return new DocumentBodyResult.Ok<>(contentType.contentType(), DocumentBodyToString.getStringData(contentType, data)); } catch (IOException ex) { logger.error("Failed to extract body", ex); - return new DocumentBodyResult.Error(CrawlerDocumentStatus.ERROR, ""); + return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.ERROR, ""); } } diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyResult.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyResult.java index 1959f844..0f30dc1f 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyResult.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyResult.java @@ -5,19 +5,35 @@ import nu.marginalia.crawling.model.CrawlerDocumentStatus; import java.util.Optional; import java.util.function.BiFunction; -public sealed interface DocumentBodyResult { - record Ok(String contentType, String body) implements DocumentBodyResult { +public sealed interface DocumentBodyResult { + record Ok(String contentType, T body) implements DocumentBodyResult { + @Override - public Optional map(BiFunction fun) { - return Optional.of(fun.apply(contentType, body)); + public Optional mapOpt(BiFunction mapper) { + return Optional.of(mapper.apply(contentType, body)); + } + + @Override + public void ifPresent(ExConsumer consumer) throws Exception { + consumer.accept(contentType, body); } } - record Error(CrawlerDocumentStatus status, String why) implements DocumentBodyResult { + record Error(CrawlerDocumentStatus status, String why) implements DocumentBodyResult { @Override - public Optional map(BiFunction fun) { + public Optional mapOpt(BiFunction mapper) { return Optional.empty(); } + + @Override + public void ifPresent(ExConsumer consumer) throws Exception { + } } - Optional map(BiFunction fun); + Optional mapOpt(BiFunction mapper); + + void ifPresent(ExConsumer consumer) throws Exception; + + interface ExConsumer { + void accept(String contentType, T t) throws E; + } } diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/HttpFetchResult.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/HttpFetchResult.java index 9790e3da..40db21a5 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/HttpFetchResult.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/HttpFetchResult.java @@ -4,12 +4,12 @@ import okhttp3.Headers; import org.jsoup.Jsoup; import org.netpreserve.jwarc.MessageHeaders; import org.netpreserve.jwarc.WarcResponse; -import org.netpreserve.jwarc.WarcRevisit; import org.jsoup.nodes.Document; import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; +import java.net.InetAddress; import java.net.URI; import java.util.List; import java.util.Optional; @@ -18,44 +18,39 @@ public sealed interface HttpFetchResult { boolean isOk(); - static ResultOk importWarc(WarcResponse response) throws IOException { - var http = response.http(); - try (var body = http.body()) { - byte[] bytes = body.stream().readAllBytes(); + static HttpFetchResult importWarc(WarcResponse response) { + try { + var http = response.http(); - return new ResultOk( - response.targetURI(), - http.status(), - http.headers(), - bytes, - 0, - bytes.length - ); + try (var body = http.body()) { + byte[] bytes = body.stream().readAllBytes(); + + String ipAddress = response + .ipAddress() + .map(InetAddress::getHostAddress) + .orElse(""); + + return new ResultOk( + response.targetURI(), + http.status(), + http.headers(), + ipAddress, + bytes, + 0, + bytes.length + ); + } + } + catch (Exception ex) { + return new ResultException(ex); } } - static ResultOk importWarc(WarcRevisit revisit) throws IOException { - var http = revisit.http(); - try (var body = http.body()) { - byte[] bytes = body.stream().readAllBytes(); - - return new ResultOk( - revisit.targetURI(), - http.status(), - http.headers(), - bytes, - 0, - bytes.length - ); - } - finally { - revisit.body().consume(); - } - } record ResultOk(URI uri, int statusCode, Headers headers, + String ipAddress, byte[] bytesRaw, int bytesStart, int bytesLength @@ -68,10 +63,11 @@ public sealed interface HttpFetchResult { public ResultOk(URI uri, int statusCode, MessageHeaders headers, + String ipAddress, byte[] bytesRaw, int bytesStart, int bytesLength) { - this(uri, statusCode, convertHeaders(headers), bytesRaw, bytesStart, bytesLength); + this(uri, statusCode, convertHeaders(headers), ipAddress, bytesRaw, bytesStart, bytesLength); } private static Headers convertHeaders(MessageHeaders headers) { @@ -89,8 +85,8 @@ public sealed interface HttpFetchResult { } public Optional parseDocument() throws IOException { - return switch(DocumentBodyExtractor.extractBody(this)) { - case DocumentBodyResult.Ok ok when "text/html".equalsIgnoreCase(ok.contentType()) + return switch(DocumentBodyExtractor.asString(this)) { + case DocumentBodyResult.Ok ok when "text/html".equalsIgnoreCase(ok.contentType()) -> Optional.of(Jsoup.parse(ok.body())); default -> Optional.empty(); }; @@ -105,7 +101,7 @@ public sealed interface HttpFetchResult { }; - record ResultRetained(String url, String body) implements HttpFetchResult { + record ResultRetained(String url, String contentType, String body) implements HttpFetchResult { public boolean isOk() { return true; diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java index 907eb081..2a0029b4 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/CrawlerOutputFile.java @@ -10,9 +10,7 @@ public class CrawlerOutputFile { /** Return the Path to a file for the given id and name */ public static Path getLegacyOutputFile(Path base, String id, String name) { - if (id.length() < 4) { - id = Strings.repeat("0", 4 - id.length()) + id; - } + id = padId(id); String first = id.substring(0, 2); String second = id.substring(2, 4); @@ -24,9 +22,7 @@ public class CrawlerOutputFile { /** Return the Path to a file for the given id and name, creating the prerequisite * directory structure as necessary. */ public static Path createLegacyOutputPath(Path base, String id, String name) throws IOException { - if (id.length() < 4) { - id = Strings.repeat("0", 4 - id.length()) + id; - } + id = padId(id); String first = id.substring(0, 2); String second = id.substring(2, 4); @@ -54,9 +50,7 @@ public class CrawlerOutputFile { } public static Path createWarcPath(Path basePath, String id, String domain, WarcFileVersion version) throws IOException { - if (id.length() < 4) { - id = Strings.repeat("0", 4 - id.length()) + id; - } + id = padId(id); String first = id.substring(0, 2); String second = id.substring(2, 4); @@ -68,10 +62,20 @@ public class CrawlerOutputFile { return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}-\{version.suffix}.warc.gz"); } - public static Path getWarcPath(Path basePath, String id, String domain, WarcFileVersion version) { - if (id.length() < 4) { - id = Strings.repeat("0", 4 - id.length()) + id; + public static Path createParquetPath(Path basePath, String id, String domain) throws IOException { + id = padId(id); + + String first = id.substring(0, 2); + String second = id.substring(2, 4); + + Path destDir = basePath.resolve(first).resolve(second); + if (!Files.exists(destDir)) { + Files.createDirectories(destDir); } + return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}.parquet"); + } + public static Path getWarcPath(Path basePath, String id, String domain, WarcFileVersion version) { + id = padId(id); String first = id.substring(0, 2); String second = id.substring(2, 4); @@ -80,6 +84,18 @@ public class CrawlerOutputFile { return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}.warc\{version.suffix}"); } + /** + * Pads the given ID with leading zeros to ensure it has a length of 4 characters. + */ + private static String padId(String id) { + if (id.length() < 4) { + id = Strings.repeat("0", 4 - id.length()) + id; + } + + return id; + } + + public enum WarcFileVersion { LIVE("open"), TEMP("tmp"), diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/WarcReadingSerializableCrawlDataStream.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/WarcReadingSerializableCrawlDataStream.java index 9d8d1a63..9c81f0ca 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/WarcReadingSerializableCrawlDataStream.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/io/format/WarcReadingSerializableCrawlDataStream.java @@ -88,10 +88,9 @@ public class WarcReadingSerializableCrawlDataStream implements AutoCloseable, Se if (http.status() != 200) { return; } - CrawledDocument document; - var parsedBody = DocumentBodyExtractor.extractBody(HttpFetchResult.importWarc(response)); - if (parsedBody instanceof DocumentBodyResult.Error error) { + var parsedBody = DocumentBodyExtractor.asString(HttpFetchResult.importWarc(response)); + if (parsedBody instanceof DocumentBodyResult.Error error) { next = new CrawledDocument( "", response.targetURI().toString(), @@ -106,7 +105,7 @@ public class WarcReadingSerializableCrawlDataStream implements AutoCloseable, Se "", "", ""); - } else if (parsedBody instanceof DocumentBodyResult.Ok ok) { + } else if (parsedBody instanceof DocumentBodyResult.Ok ok) { next = new CrawledDocument( "", response.targetURI().toString(), diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecord.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecord.java index 614be635..6e0e5a0b 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecord.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecord.java @@ -22,6 +22,7 @@ public class CrawledDocumentParquetRecord { public String url; public String ip; public boolean cookies; + public int httpStatus; public String contentType; public byte[] body; @@ -39,6 +40,7 @@ public class CrawledDocumentParquetRecord { Types.required(BINARY).as(stringType()).named("url"), Types.required(BINARY).as(stringType()).named("ip"), Types.required(BOOLEAN).named("cookies"), + Types.required(INT32).named("httpStatus"), Types.required(BINARY).as(stringType()).named("contentType"), Types.required(BINARY).named("body") ); @@ -49,6 +51,7 @@ public class CrawledDocumentParquetRecord { case "domain" -> domain = (String) value; case "url" -> url = (String) value; case "ip" -> ip = (String) value; + case "httpStatus" -> httpStatus = (Integer) value; case "cookies" -> cookies = (Boolean) value; case "contentType" -> contentType = (String) value; case "body" -> body = (byte[]) value; @@ -61,6 +64,7 @@ public class CrawledDocumentParquetRecord { valueWriter.write("domain", domain); valueWriter.write("url", url); valueWriter.write("ip", ip); + valueWriter.write("httpStatus", httpStatus); valueWriter.write("cookies", cookies); valueWriter.write("contentType", contentType); valueWriter.write("body", body); diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriter.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriter.java index f4961c01..d9fea865 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriter.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriter.java @@ -1,12 +1,37 @@ package nu.marginalia.crawling.parquet; import blue.strategic.parquet.ParquetWriter; +import nu.marginalia.crawling.body.DocumentBodyExtractor; +import nu.marginalia.crawling.body.DocumentBodyResult; +import nu.marginalia.crawling.body.HttpFetchResult; +import org.netpreserve.jwarc.WarcReader; +import org.netpreserve.jwarc.WarcRecord; +import org.netpreserve.jwarc.WarcResponse; +import org.netpreserve.jwarc.WarcXResponseReference; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.file.Path; public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable { private final ParquetWriter writer; + private static final Logger logger = LoggerFactory.getLogger(CrawledDocumentParquetRecordFileWriter.class); + + public static void convertWarc(String domain, Path warcInputFile, Path parquetOutputFile) throws IOException { + try (var warcReader = new WarcReader(warcInputFile); + var parquetWriter = new CrawledDocumentParquetRecordFileWriter(parquetOutputFile) + ) { + WarcXResponseReference.register(warcReader); + + for (var record : warcReader) { + parquetWriter.write(domain, record); + } + } + catch (Exception ex) { + logger.error("Failed to convert WARC file to Parquet", ex); + } + } public CrawledDocumentParquetRecordFileWriter(Path file) throws IOException { writer = ParquetWriter.writeFile(CrawledDocumentParquetRecord.schema, @@ -17,6 +42,42 @@ public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable { writer.write(domainData); } + public void write(String domain, WarcRecord record) throws IOException { + if (!(record instanceof WarcResponse ref)) { + return; + } + + HttpFetchResult result = HttpFetchResult.importWarc(ref); + if (!(result instanceof HttpFetchResult.ResultOk fetchOk)) { + return; + } + + byte[] bodyBytes; + String contentType; + + var body = DocumentBodyExtractor.asBytes(result); + + if (body instanceof DocumentBodyResult.Ok bodyOk) { + bodyBytes = bodyOk.body(); + contentType = bodyOk.contentType(); + } + else { + bodyBytes = new byte[0]; + contentType = ""; + } + + write(new CrawledDocumentParquetRecord( + domain, + ref.target(), + fetchOk.ipAddress(), + false, // FIXME + fetchOk.statusCode(), + contentType, + bodyBytes) + ); + } + + public void close() throws IOException { writer.close(); } diff --git a/code/process-models/crawling-model/src/test/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriterTest.java b/code/process-models/crawling-model/src/test/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriterTest.java index 07a27200..f8661355 100644 --- a/code/process-models/crawling-model/src/test/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriterTest.java +++ b/code/process-models/crawling-model/src/test/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriterTest.java @@ -3,6 +3,7 @@ package nu.marginalia.crawling.parquet; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.netpreserve.jwarc.net.WarcRecorder; import java.io.IOException; import java.nio.file.Files; @@ -29,6 +30,7 @@ class CrawledDocumentParquetRecordFileWriterTest { "https://www.marginalia.nu/", "127.0.0.1", false, + 200, "text/html", "hello world".getBytes()); @@ -41,4 +43,5 @@ class CrawledDocumentParquetRecordFileWriterTest { assertEquals(original, actual); } } + } \ No newline at end of file diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java index f5b5a10e..5c6241f7 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java @@ -19,6 +19,7 @@ import nu.marginalia.crawl.spec.DbCrawlSpecProvider; import nu.marginalia.crawl.spec.ParquetCrawlSpecProvider; import nu.marginalia.crawling.io.CrawledDomainReader; import nu.marginalia.crawling.io.CrawlerOutputFile; +import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileWriter; import nu.marginalia.crawlspec.CrawlSpecFileNames; import nu.marginalia.storage.FileStorageService; import nu.marginalia.model.crawlspec.CrawlSpecRecord; @@ -29,7 +30,6 @@ import nu.marginalia.mq.inbox.MqSingleShotInbox; import nu.marginalia.process.control.ProcessHeartbeatImpl; import nu.marginalia.process.log.WorkLog; import nu.marginalia.service.module.DatabaseModule; -import nu.marginalia.crawling.io.CrawledDomainWriter; import nu.marginalia.crawl.retreival.CrawlerRetreiver; import nu.marginalia.util.SimpleBlockingThreadPool; import okhttp3.ConnectionPool; @@ -216,6 +216,7 @@ public class CrawlerMain { Path newWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.LIVE); Path tempFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.TEMP); Path finalWarcFile = CrawlerOutputFile.createWarcPath(outputDir, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL); + Path parquetFile = CrawlerOutputFile.createParquetPath(outputDir, id, domain); if (Files.exists(newWarcFile)) { Files.move(newWarcFile, tempFile, StandardCopyOption.REPLACE_EXISTING); @@ -245,6 +246,9 @@ public class CrawlerMain { Files.move(newWarcFile, finalWarcFile, StandardCopyOption.REPLACE_EXISTING); + CrawledDocumentParquetRecordFileWriter + .convertWarc(domain, finalWarcFile, parquetFile); + workLog.setJobToFinished(domain, finalWarcFile.toString(), size); heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java index 514243ee..80d6853b 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerRetreiver.java @@ -251,7 +251,7 @@ public class CrawlerRetreiver implements AutoCloseable { var doc = reference.doc(); if (doc != null) { warcRecorder.writeReferenceCopy(top, doc.contentType, doc.httpStatus, doc.documentBody); - fetchedDoc = new HttpFetchResult.ResultRetained(doc.url, doc.documentBody); + fetchedDoc = new HttpFetchResult.ResultRetained(doc.url, doc.contentType, doc.documentBody); } } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java index 1a66c7a5..47b5b2d8 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/CrawlerWarcResynchronizer.java @@ -33,6 +33,8 @@ public class CrawlerWarcResynchronizer { public void run(Path tempFile) { // First pass, enqueue links try (var reader = new WarcReader(tempFile)) { + WarcXResponseReference.register(reader); + for (var item : reader) { accept(item); } @@ -54,8 +56,6 @@ public class CrawlerWarcResynchronizer { try { if (item instanceof WarcResponse rsp) { response(rsp); - } else if (item instanceof WarcRevisit revisit) { - revisit(revisit); } else if (item instanceof WarcRequest req) { request(req); } @@ -76,35 +76,18 @@ public class CrawlerWarcResynchronizer { try { var response = HttpFetchResult.importWarc(rsp); - if (DocumentBodyExtractor.extractBody(response) instanceof DocumentBodyResult.Ok ok) { - var doc = Jsoup.parse(ok.body()); + DocumentBodyExtractor + .asString(response) + .ifPresent((ct, body) -> + { + var doc = Jsoup.parse(body); crawlFrontier.enqueueLinksFromDocument(url, doc); - } + }); } catch (Exception e) { logger.info(STR."Failed to parse response body for \{url}", e); } } - private void revisit(WarcRevisit revisit) throws IOException { - if (!WarcRecorder.documentRevisitURN.equals(revisit.profile())) { - return; - } - - var url = new EdgeUrl(revisit.targetURI()); - - crawlFrontier.addVisited(url); - - try { - var response = HttpFetchResult.importWarc(revisit); - if (DocumentBodyExtractor.extractBody(response) instanceof DocumentBodyResult.Ok ok) { - var doc = Jsoup.parse(ok.body()); - crawlFrontier.enqueueLinksFromDocument(url, doc); - } - } - catch (Exception e) { - logger.info(STR."Failed to parse response body for \{url}", e); - } - } } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java index d7732baa..f8f11b13 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/HttpFetcherImpl.java @@ -11,6 +11,8 @@ import nu.marginalia.crawl.retreival.fetcher.ContentTypeProber.ContentTypeProbeR import nu.marginalia.crawl.retreival.fetcher.socket.FastTerminatingSocketFactory; import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor; import nu.marginalia.crawl.retreival.fetcher.socket.NoSecuritySSL; +import nu.marginalia.crawling.body.DocumentBodyExtractor; +import nu.marginalia.crawling.body.DocumentBodyResult; import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; import nu.marginalia.crawling.body.ContentTypeLogic; @@ -263,24 +265,19 @@ public class HttpFetcherImpl implements HttpFetcher { HttpFetchResult result = recorder.fetch(client, getBuilder.build()); - if (result instanceof HttpFetchResult.ResultOk ok) { - return Optional.of(parseRobotsTxt(ok)); - } - else { - return Optional.empty(); - } + return DocumentBodyExtractor.asBytes(result).mapOpt((contentType, body) -> + robotsParser.parseContent(url.toString(), + body, + contentType, + userAgent) + ); + } catch (Exception ex) { return Optional.empty(); } } - private SimpleRobotRules parseRobotsTxt(HttpFetchResult.ResultOk ok) { - return robotsParser.parseContent(ok.uri().toString(), - ok.bytesRaw(), - ok.header("Content-Type"), - userAgent); - } } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java index 368bf3c7..2ceb076d 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java @@ -6,6 +6,8 @@ import okhttp3.Response; import org.apache.commons.lang3.StringUtils; import java.net.URI; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Map; import java.util.StringJoiner; @@ -18,7 +20,8 @@ public class WarcProtocolReconstructor { static String getHttpRequestString(Request request, URI uri) { StringBuilder requestStringBuilder = new StringBuilder(); - requestStringBuilder.append(request.method()).append(" ").append(uri.getPath()); + requestStringBuilder.append(request.method()).append(" ").append(URLEncoder.encode(uri.getPath(), StandardCharsets.UTF_8)); + if (uri.getQuery() != null) { requestStringBuilder.append("?").append(uri.getQuery()); } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java index b7bb82bd..bce1b890 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java @@ -29,8 +29,6 @@ import java.util.*; * be reconstructed. */ public class WarcRecorder implements AutoCloseable { - public static final URI documentRevisitURN = URI.create("urn:marginalia/data/doc/revisit"); - public static final URI documentRobotsTxtSkippedURN = URI.create("urn:marginalia/meta/doc/robots-txt-skipped"); public static final URI documentBadContentTypeURN = URI.create("urn:marginalia/meta/doc/content-type-failed-probe"); public static final URI documentProbeTimeout = URI.create("urn:marginalia/meta/doc/timeout-probe"); @@ -173,6 +171,7 @@ public class WarcRecorder implements AutoCloseable { return new HttpFetchResult.ResultOk(uri, response.code(), response.headers(), + ip, responseDataBuffer.data, dataStart, responseDataBuffer.length() - dataStart); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java index 03b96760..31df5e0e 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/revisit/DocumentWithReference.java @@ -1,13 +1,11 @@ package nu.marginalia.crawl.retreival.revisit; -import lombok.SneakyThrows; import nu.marginalia.crawl.retreival.CrawlDataReference; import nu.marginalia.crawl.retreival.fetcher.ContentTags; -import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; import nu.marginalia.crawling.body.DocumentBodyExtractor; +import nu.marginalia.crawling.body.DocumentBodyResult; import nu.marginalia.crawling.body.HttpFetchResult; import nu.marginalia.crawling.model.CrawledDocument; -import nu.marginalia.model.EdgeUrl; import javax.annotation.Nullable; @@ -40,9 +38,11 @@ public record DocumentWithReference( if (doc.documentBody == null) return false; - return DocumentBodyExtractor.extractBody(resultOk) - .map((contentType, body) -> reference.isContentBodySame(doc.documentBody, body)) - .orElse(false); + if (!(DocumentBodyExtractor.asString(resultOk) instanceof DocumentBodyResult.Ok bodyOk)) { + return false; + } + + return reference.isContentBodySame(doc.documentBody, bodyOk.body()); } public ContentTags getContentTags() { diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java index 4faa2042..e8ba9437 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawl/retreival/fetcher/WarcRecorderTest.java @@ -2,6 +2,8 @@ package nu.marginalia.crawl.retreival.fetcher; import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor; import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder; +import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileReader; +import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileWriter; import nu.marginalia.model.EdgeUrl; import okhttp3.OkHttpClient; import okhttp3.Request; @@ -20,10 +22,10 @@ import java.util.Map; import java.util.zip.GZIPInputStream; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertInstanceOf; class WarcRecorderTest { - Path fileName; + Path fileNameWarc; + Path fileNameParquet; WarcRecorder client; OkHttpClient httpClient; @BeforeEach @@ -32,14 +34,16 @@ class WarcRecorderTest { .addNetworkInterceptor(new IpInterceptingNetworkInterceptor()) .build(); - fileName = Files.createTempFile("test", ".warc"); - client = new WarcRecorder(fileName); + fileNameWarc = Files.createTempFile("test", ".warc"); + fileNameParquet = Files.createTempFile("test", ".parquet"); + + client = new WarcRecorder(fileNameWarc); } @AfterEach public void tearDown() throws Exception { client.close(); - Files.delete(fileName); + Files.delete(fileNameWarc); } @Test @@ -49,10 +53,10 @@ class WarcRecorderTest { .addHeader("Accept-Encoding", "gzip") .get().build()); - new GZIPInputStream(Files.newInputStream(fileName)).transferTo(System.out); + new GZIPInputStream(Files.newInputStream(fileNameWarc)).transferTo(System.out); Map sampleData = new HashMap<>(); - try (var warcReader = new WarcReader(fileName)) { + try (var warcReader = new WarcReader(fileNameWarc)) { warcReader.forEach(record -> { if (record instanceof WarcRequest req) { sampleData.put(record.type(), req.target()); @@ -70,14 +74,14 @@ class WarcRecorderTest { @Test public void flagAsSkipped() throws IOException, URISyntaxException { - try (var recorder = new WarcRecorder(fileName)) { + try (var recorder = new WarcRecorder(fileNameWarc)) { recorder.flagAsSkipped(new EdgeUrl("https://www.marginalia.nu/"), "text/html", 200, "test"); } - try (var reader = new WarcReader(fileName)) { + try (var reader = new WarcReader(fileNameWarc)) { for (var record : reader) { if (record instanceof WarcResponse rsp) { assertEquals("https://www.marginalia.nu/", rsp.target()); @@ -88,19 +92,19 @@ class WarcRecorderTest { } } - new GZIPInputStream(Files.newInputStream(fileName)).transferTo(System.out); + new GZIPInputStream(Files.newInputStream(fileNameWarc)).transferTo(System.out); } @Test public void testSaveImport() throws URISyntaxException, IOException { - try (var recorder = new WarcRecorder(fileName)) { + try (var recorder = new WarcRecorder(fileNameWarc)) { recorder.flagAsSkipped(new EdgeUrl("https://www.marginalia.nu/"), "text/html", 200, "test"); } - try (var reader = new WarcReader(fileName)) { + try (var reader = new WarcReader(fileNameWarc)) { WarcXResponseReference.register(reader); for (var record : reader) { @@ -114,4 +118,30 @@ class WarcRecorderTest { } + @Test + public void testConvertToParquet() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException { + client.fetch(httpClient, new Request.Builder().url("https://www.marginalia.nu/") + .addHeader("User-agent", "test.marginalia.nu") + .addHeader("Accept-Encoding", "gzip") + .get().build()); + client.fetch(httpClient, new Request.Builder().url("https://www.marginalia.nu/log/") + .addHeader("User-agent", "test.marginalia.nu") + .addHeader("Accept-Encoding", "gzip") + .get().build()); + client.fetch(httpClient, new Request.Builder().url("https://www.marginalia.nu/sanic.png") + .addHeader("User-agent", "test.marginalia.nu") + .addHeader("Accept-Encoding", "gzip") + .get().build()); + client.close(); + + CrawledDocumentParquetRecordFileWriter.convertWarc("www.marginalia.nu", fileNameWarc, fileNameParquet); + + var urls = CrawledDocumentParquetRecordFileReader.stream(fileNameParquet).map(doc -> doc.url).toList(); + assertEquals(3, urls.size()); + assertEquals("https://www.marginalia.nu/", urls.get(0)); + assertEquals("https://www.marginalia.nu/log/", urls.get(1)); + assertEquals("https://www.marginalia.nu/sanic.png", urls.get(2)); + + } + } \ No newline at end of file diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java index 4590dde2..0873924f 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/HttpFetcherTest.java @@ -36,7 +36,7 @@ class HttpFetcherTest { var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler"); try (var recorder = new WarcRecorder()) { var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), recorder, ContentTags.empty()); - if (DocumentBodyExtractor.extractBody(result) instanceof DocumentBodyResult.Ok bodyOk) { + if (DocumentBodyExtractor.asString(result) instanceof DocumentBodyResult.Ok bodyOk) { System.out.println(bodyOk.contentType()); } } @@ -48,7 +48,7 @@ class HttpFetcherTest { try (var recorder = new WarcRecorder()) { var result = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), recorder, ContentTags.empty()); - if (DocumentBodyExtractor.extractBody(result) instanceof DocumentBodyResult.Ok bodyOk) { + if (DocumentBodyExtractor.asString(result) instanceof DocumentBodyResult.Ok bodyOk) { System.out.println(bodyOk.contentType()); } } diff --git a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java index b7727022..749b821c 100644 --- a/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java +++ b/code/processes/crawling-process/src/test/java/nu/marginalia/crawling/retreival/CrawlerMockFetcherTest.java @@ -127,6 +127,7 @@ public class CrawlerMockFetcherTest { url.asURI(), 200, new Headers.Builder().build(), + "127.0.0.1", bodyBytes, 0, bodyBytes.length