(crawling-model) Implement a parquet format for crawl data

This is not hooked into anything yet.  The change also makes modifications to the parquet-floor library to support reading and writing of byte[] arrays.  This is desirable since we may in the future want to support inputs that are not text-based, and codifying the assumption that each document is a string will definitely cause us grief down the line.
This commit is contained in:
Viktor Lofgren 2023-12-13 16:22:19 +01:00
parent a73f1ab0ac
commit 787a20cbaa
7 changed files with 182 additions and 2 deletions

View File

@ -22,10 +22,12 @@ dependencies {
implementation project(':code:common:service-client')
implementation project(':code:features-crawl:content-type')
implementation project(':code:libraries:language-processing')
implementation project(':third-party:parquet-floor')
implementation libs.bundles.slf4j
implementation libs.notnull
implementation libs.bundles.parquet
implementation libs.jwarc
implementation libs.gson

View File

@ -0,0 +1,87 @@
package nu.marginalia.crawling.parquet;
import blue.strategic.parquet.Dehydrator;
import blue.strategic.parquet.Hydrator;
import blue.strategic.parquet.ValueWriter;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import lombok.ToString;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode
@ToString
public class CrawledDocumentParquetRecord {
public String domain;
public String url;
public String ip;
public boolean cookies;
public String contentType;
public byte[] body;
public static Hydrator<CrawledDocumentParquetRecord, CrawledDocumentParquetRecord> newHydrator() {
return new CrawledDocumentParquetRecordHydrator();
}
public static Dehydrator<CrawledDocumentParquetRecord> newDehydrator() {
return CrawledDocumentParquetRecord::dehydrate;
}
public static MessageType schema = new MessageType(
CrawledDocumentParquetRecord.class.getSimpleName(),
Types.required(BINARY).as(stringType()).named("domain"),
Types.required(BINARY).as(stringType()).named("url"),
Types.required(BINARY).as(stringType()).named("ip"),
Types.required(BOOLEAN).named("cookies"),
Types.required(BINARY).as(stringType()).named("contentType"),
Types.required(BINARY).named("body")
);
public CrawledDocumentParquetRecord add(String heading, Object value) {
switch (heading) {
case "domain" -> domain = (String) value;
case "url" -> url = (String) value;
case "ip" -> ip = (String) value;
case "cookies" -> cookies = (Boolean) value;
case "contentType" -> contentType = (String) value;
case "body" -> body = (byte[]) value;
default -> throw new UnsupportedOperationException("Unknown heading '" + heading + '"');
}
return this;
}
public void dehydrate(ValueWriter valueWriter) {
valueWriter.write("domain", domain);
valueWriter.write("url", url);
valueWriter.write("ip", ip);
valueWriter.write("cookies", cookies);
valueWriter.write("contentType", contentType);
valueWriter.write("body", body);
}
}
class CrawledDocumentParquetRecordHydrator implements Hydrator<CrawledDocumentParquetRecord, CrawledDocumentParquetRecord> {
@Override
public CrawledDocumentParquetRecord start() {
return new CrawledDocumentParquetRecord();
}
@Override
public CrawledDocumentParquetRecord add(CrawledDocumentParquetRecord target, String heading, Object value) {
return target.add(heading, value);
}
@Override
public CrawledDocumentParquetRecord finish(CrawledDocumentParquetRecord target) {
return target;
}
}

View File

@ -0,0 +1,19 @@
package nu.marginalia.crawling.parquet;
import blue.strategic.parquet.HydratorSupplier;
import blue.strategic.parquet.ParquetReader;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.nio.file.Path;
import java.util.stream.Stream;
public class CrawledDocumentParquetRecordFileReader {
@NotNull
public static Stream<CrawledDocumentParquetRecord> stream(Path path) throws IOException {
return ParquetReader.streamContent(path.toFile(),
HydratorSupplier.constantly(CrawledDocumentParquetRecord.newHydrator()));
}
}

View File

@ -0,0 +1,23 @@
package nu.marginalia.crawling.parquet;
import blue.strategic.parquet.ParquetWriter;
import java.io.IOException;
import java.nio.file.Path;
public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable {
private final ParquetWriter<CrawledDocumentParquetRecord> writer;
public CrawledDocumentParquetRecordFileWriter(Path file) throws IOException {
writer = ParquetWriter.writeFile(CrawledDocumentParquetRecord.schema,
file.toFile(), CrawledDocumentParquetRecord.newDehydrator());
}
public void write(CrawledDocumentParquetRecord domainData) throws IOException {
writer.write(domainData);
}
public void close() throws IOException {
writer.close();
}
}

View File

@ -0,0 +1,44 @@
package nu.marginalia.crawling.parquet;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import static org.junit.jupiter.api.Assertions.*;
class CrawledDocumentParquetRecordFileWriterTest {
Path tempFile;
@BeforeEach
public void setUp() throws IOException {
tempFile = Files.createTempFile("test", ".parquet");
}
@AfterEach
public void tearDown() throws IOException {
Files.delete(tempFile);
}
@Test
void write() throws IOException {
var original = new CrawledDocumentParquetRecord("www.marginalia.nu",
"https://www.marginalia.nu/",
"127.0.0.1",
false,
"text/html",
"hello world".getBytes());
try (var writer = new CrawledDocumentParquetRecordFileWriter(tempFile)) {
writer.write(original);
}
try (var stream = CrawledDocumentParquetRecordFileReader.stream(tempFile)) {
var actual = stream.findFirst().orElseThrow();
assertEquals(original, actual);
}
}
}

View File

@ -13,6 +13,7 @@ import org.apache.parquet.io.DelegatingSeekableInputStream;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
@ -144,7 +145,11 @@ public final class ParquetReader<U, S> implements Spliterator<S>, Closeable {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
case INT96:
return primitiveType.stringifier().stringify(columnReader.getBinary());
if (primitiveType.getLogicalTypeAnnotation() == null) {
return columnReader.getBinary().getBytes();
} else {
return primitiveType.stringifier().stringify(columnReader.getBinary());
}
case BOOLEAN:
return columnReader.getBoolean();
case DOUBLE:

View File

@ -242,7 +242,7 @@ public final class ParquetWriter<T> implements Closeable {
if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.stringType()) {
recordConsumer.addBinary(Binary.fromString((String)value));
} else {
throw new UnsupportedOperationException("We don't support writing logical annotation type " + type.getLogicalTypeAnnotation());
recordConsumer.addBinary(Binary.fromConstantByteArray((byte[])value));
}
break;
default: