diff --git a/code/process-models/crawling-model/build.gradle b/code/process-models/crawling-model/build.gradle index f1f77a70..03db0de9 100644 --- a/code/process-models/crawling-model/build.gradle +++ b/code/process-models/crawling-model/build.gradle @@ -22,10 +22,12 @@ dependencies { implementation project(':code:common:service-client') implementation project(':code:features-crawl:content-type') implementation project(':code:libraries:language-processing') + implementation project(':third-party:parquet-floor') implementation libs.bundles.slf4j implementation libs.notnull + implementation libs.bundles.parquet implementation libs.jwarc implementation libs.gson diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecord.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecord.java new file mode 100644 index 00000000..614be635 --- /dev/null +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecord.java @@ -0,0 +1,87 @@ +package nu.marginalia.crawling.parquet; + +import blue.strategic.parquet.Dehydrator; +import blue.strategic.parquet.Hydrator; +import blue.strategic.parquet.ValueWriter; +import lombok.AllArgsConstructor; +import lombok.EqualsAndHashCode; +import lombok.NoArgsConstructor; +import lombok.ToString; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.Types; + +import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; +import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; + +@AllArgsConstructor +@NoArgsConstructor +@EqualsAndHashCode +@ToString +public class CrawledDocumentParquetRecord { + public String domain; + public String url; + public String ip; + public boolean cookies; + public String contentType; + public byte[] body; + + public static Hydrator newHydrator() { + return new CrawledDocumentParquetRecordHydrator(); + } + + public static Dehydrator newDehydrator() { + return CrawledDocumentParquetRecord::dehydrate; + } + + public static MessageType schema = new MessageType( + CrawledDocumentParquetRecord.class.getSimpleName(), + Types.required(BINARY).as(stringType()).named("domain"), + Types.required(BINARY).as(stringType()).named("url"), + Types.required(BINARY).as(stringType()).named("ip"), + Types.required(BOOLEAN).named("cookies"), + Types.required(BINARY).as(stringType()).named("contentType"), + Types.required(BINARY).named("body") + ); + + + public CrawledDocumentParquetRecord add(String heading, Object value) { + switch (heading) { + case "domain" -> domain = (String) value; + case "url" -> url = (String) value; + case "ip" -> ip = (String) value; + case "cookies" -> cookies = (Boolean) value; + case "contentType" -> contentType = (String) value; + case "body" -> body = (byte[]) value; + default -> throw new UnsupportedOperationException("Unknown heading '" + heading + '"'); + } + return this; + } + + public void dehydrate(ValueWriter valueWriter) { + valueWriter.write("domain", domain); + valueWriter.write("url", url); + valueWriter.write("ip", ip); + valueWriter.write("cookies", cookies); + valueWriter.write("contentType", contentType); + valueWriter.write("body", body); + } +} + +class CrawledDocumentParquetRecordHydrator implements Hydrator { + + @Override + public CrawledDocumentParquetRecord start() { + return new CrawledDocumentParquetRecord(); + } + + @Override + public CrawledDocumentParquetRecord add(CrawledDocumentParquetRecord target, String heading, Object value) { + return target.add(heading, value); + } + + @Override + public CrawledDocumentParquetRecord finish(CrawledDocumentParquetRecord target) { + return target; + } + +} diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileReader.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileReader.java new file mode 100644 index 00000000..7e8c7501 --- /dev/null +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileReader.java @@ -0,0 +1,19 @@ +package nu.marginalia.crawling.parquet; + +import blue.strategic.parquet.HydratorSupplier; +import blue.strategic.parquet.ParquetReader; +import org.jetbrains.annotations.NotNull; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.stream.Stream; + +public class CrawledDocumentParquetRecordFileReader { + + @NotNull + public static Stream stream(Path path) throws IOException { + return ParquetReader.streamContent(path.toFile(), + HydratorSupplier.constantly(CrawledDocumentParquetRecord.newHydrator())); + } + +} diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriter.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriter.java new file mode 100644 index 00000000..f4961c01 --- /dev/null +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriter.java @@ -0,0 +1,23 @@ +package nu.marginalia.crawling.parquet; + +import blue.strategic.parquet.ParquetWriter; + +import java.io.IOException; +import java.nio.file.Path; + +public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable { + private final ParquetWriter writer; + + public CrawledDocumentParquetRecordFileWriter(Path file) throws IOException { + writer = ParquetWriter.writeFile(CrawledDocumentParquetRecord.schema, + file.toFile(), CrawledDocumentParquetRecord.newDehydrator()); + } + + public void write(CrawledDocumentParquetRecord domainData) throws IOException { + writer.write(domainData); + } + + public void close() throws IOException { + writer.close(); + } +} diff --git a/code/process-models/crawling-model/src/test/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriterTest.java b/code/process-models/crawling-model/src/test/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriterTest.java new file mode 100644 index 00000000..07a27200 --- /dev/null +++ b/code/process-models/crawling-model/src/test/java/nu/marginalia/crawling/parquet/CrawledDocumentParquetRecordFileWriterTest.java @@ -0,0 +1,44 @@ +package nu.marginalia.crawling.parquet; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.junit.jupiter.api.Assertions.*; + +class CrawledDocumentParquetRecordFileWriterTest { + Path tempFile; + + @BeforeEach + public void setUp() throws IOException { + tempFile = Files.createTempFile("test", ".parquet"); + } + + @AfterEach + public void tearDown() throws IOException { + Files.delete(tempFile); + } + + @Test + void write() throws IOException { + var original = new CrawledDocumentParquetRecord("www.marginalia.nu", + "https://www.marginalia.nu/", + "127.0.0.1", + false, + "text/html", + "hello world".getBytes()); + + try (var writer = new CrawledDocumentParquetRecordFileWriter(tempFile)) { + writer.write(original); + } + + try (var stream = CrawledDocumentParquetRecordFileReader.stream(tempFile)) { + var actual = stream.findFirst().orElseThrow(); + assertEquals(original, actual); + } + } +} \ No newline at end of file diff --git a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java index 1ec3e7fb..45718fe8 100644 --- a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java +++ b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java @@ -13,6 +13,7 @@ import org.apache.parquet.io.DelegatingSeekableInputStream; import org.apache.parquet.io.InputFile; import org.apache.parquet.io.SeekableInputStream; import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.PrimitiveType; @@ -144,7 +145,11 @@ public final class ParquetReader implements Spliterator, Closeable { case BINARY: case FIXED_LEN_BYTE_ARRAY: case INT96: - return primitiveType.stringifier().stringify(columnReader.getBinary()); + if (primitiveType.getLogicalTypeAnnotation() == null) { + return columnReader.getBinary().getBytes(); + } else { + return primitiveType.stringifier().stringify(columnReader.getBinary()); + } case BOOLEAN: return columnReader.getBoolean(); case DOUBLE: diff --git a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java index 6e53c189..6d9b5734 100644 --- a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java +++ b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java @@ -242,7 +242,7 @@ public final class ParquetWriter implements Closeable { if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.stringType()) { recordConsumer.addBinary(Binary.fromString((String)value)); } else { - throw new UnsupportedOperationException("We don't support writing logical annotation type " + type.getLogicalTypeAnnotation()); + recordConsumer.addBinary(Binary.fromConstantByteArray((byte[])value)); } break; default: