(crawl data) Add compatibility layer for old crawl data format
The new converter logic assumes that the crawl data is ordered where the domain record comes first, and then a sequence of document records. This is true for the new parquet format, but not for the old zstd/gson format. To make the new converter compatible with the old format, a specialized reader is introduced that scans for the domain record before running through the sequence of document records; and presenting them in the new order. This is slower than just reading the file beginning to end, so in order to retain performance when this ordering isn't necessary, a CompatibilityLevel flag is added to CrawledDomainReader, permitting the caller to decide how compatible the data needs to be. Down the line when all the old data is purged, this should be removed, as it amounts to technical debt.
This commit is contained in:
parent
edc1acbb7e
commit
e49ba887e9
@ -1,7 +1,8 @@
|
||||
package nu.marginalia.crawling.io;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import nu.marginalia.crawling.io.format.LegacySerializableCrawlDataStream;
|
||||
import nu.marginalia.crawling.io.format.CompatibleLegacySerializableCrawlDataStream;
|
||||
import nu.marginalia.crawling.io.format.FastLegacySerializableCrawlDataStream;
|
||||
import nu.marginalia.crawling.io.format.ParquetSerializableCrawlDataStream;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
|
||||
@ -15,11 +16,24 @@ public class CrawledDomainReader {
|
||||
public CrawledDomainReader() {
|
||||
}
|
||||
|
||||
public enum CompatibilityLevel {
|
||||
/** Data order emulates the ordering of the new format. This is slower */
|
||||
COMPATIBLE,
|
||||
/** Data order is not compatible with the new format, but the data itself is */
|
||||
FAST,
|
||||
/** Alias for FAST */
|
||||
ANY
|
||||
}
|
||||
/** An iterator-like access to domain data This must be closed otherwise it will leak off-heap memory! */
|
||||
public static SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException {
|
||||
public static SerializableCrawlDataStream createDataStream(CompatibilityLevel compatibilityLevel,
|
||||
Path fullPath) throws IOException
|
||||
{
|
||||
String fileName = fullPath.getFileName().toString();
|
||||
if (fileName.endsWith(".zstd")) {
|
||||
return new LegacySerializableCrawlDataStream(gson, fullPath.toFile());
|
||||
if (compatibilityLevel == CompatibilityLevel.COMPATIBLE)
|
||||
return new CompatibleLegacySerializableCrawlDataStream(gson, fullPath.toFile());
|
||||
else // if (compatibilityLevel == CompatibilityLevel.FAST or ANY)
|
||||
return new FastLegacySerializableCrawlDataStream(gson, fullPath.toFile());
|
||||
}
|
||||
else if (fileName.endsWith(".parquet")) {
|
||||
return new ParquetSerializableCrawlDataStream(fullPath);
|
||||
@ -30,14 +44,14 @@ public class CrawledDomainReader {
|
||||
}
|
||||
|
||||
/** An iterator-like access to domain data. This must be closed otherwise it will leak off-heap memory! */
|
||||
public static SerializableCrawlDataStream createDataStream(Path basePath, String domain, String id) throws IOException {
|
||||
public static SerializableCrawlDataStream createDataStream(CompatibilityLevel level, Path basePath, String domain, String id) throws IOException {
|
||||
Path parquetPath = CrawlerOutputFile.getParquetPath(basePath, id, domain);
|
||||
|
||||
if (Files.exists(parquetPath)) {
|
||||
return createDataStream(parquetPath);
|
||||
return createDataStream(level, parquetPath);
|
||||
}
|
||||
else {
|
||||
return createDataStream(CrawlerOutputFile.getLegacyOutputFile(basePath, id, domain));
|
||||
return createDataStream(level, CrawlerOutputFile.getLegacyOutputFile(basePath, id, domain));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,107 @@
|
||||
package nu.marginalia.crawling.io.format;
|
||||
|
||||
import com.github.luben.zstd.RecyclingBufferPool;
|
||||
import com.github.luben.zstd.ZstdInputStream;
|
||||
import com.google.gson.Gson;
|
||||
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
|
||||
import nu.marginalia.crawling.model.CrawledDocument;
|
||||
import nu.marginalia.crawling.model.CrawledDomain;
|
||||
import nu.marginalia.crawling.model.SerializableCrawlData;
|
||||
|
||||
import java.io.*;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Objects;
|
||||
|
||||
import static java.util.Objects.*;
|
||||
|
||||
/** This class is used to read the old format of crawl data, which was zstd-compressed JSON
|
||||
* with type delimiters between records. It does its best to preserve the semantics of the
|
||||
* new format. This is slow.
|
||||
*/
|
||||
public class CompatibleLegacySerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
|
||||
private final Gson gson;
|
||||
private final BufferedReader bufferedReader;
|
||||
|
||||
private CrawledDomain domain;
|
||||
private SerializableCrawlData next;
|
||||
|
||||
private final Path path;
|
||||
public CompatibleLegacySerializableCrawlDataStream(Gson gson, File file) throws IOException {
|
||||
this.gson = gson;
|
||||
path = file.toPath();
|
||||
domain = findDomain(file);
|
||||
|
||||
bufferedReader = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file), RecyclingBufferPool.INSTANCE)));
|
||||
}
|
||||
|
||||
/** Scan through the file and find the domain record */
|
||||
private CrawledDomain findDomain(File file) throws IOException {
|
||||
try (var br = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file), RecyclingBufferPool.INSTANCE)))) {
|
||||
for (;;) {
|
||||
String identifierLine =
|
||||
requireNonNull(br.readLine(), "No identifier line found");
|
||||
String dataLine =
|
||||
requireNonNull(br.readLine(), "No data line found");
|
||||
|
||||
if (identifierLine.equals(CrawledDomain.SERIAL_IDENTIFIER)) {
|
||||
return gson.fromJson(dataLine, CrawledDomain.class);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path path() {
|
||||
return path;
|
||||
}
|
||||
|
||||
@Override
|
||||
public SerializableCrawlData next() throws IOException {
|
||||
if (hasNext()) {
|
||||
if (domain != null) {
|
||||
var ret = domain;
|
||||
domain = null;
|
||||
return ret;
|
||||
}
|
||||
else {
|
||||
var ret = next;
|
||||
next = null;
|
||||
return ret;
|
||||
}
|
||||
}
|
||||
throw new IllegalStateException("No more data");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() throws IOException {
|
||||
if (domain != null || next != null) {
|
||||
return true;
|
||||
}
|
||||
|
||||
String identifier = bufferedReader.readLine();
|
||||
if (identifier == null) {
|
||||
bufferedReader.close();
|
||||
return false;
|
||||
}
|
||||
String data = bufferedReader.readLine();
|
||||
if (data == null) {
|
||||
bufferedReader.close();
|
||||
return false;
|
||||
}
|
||||
|
||||
if (identifier.equals(CrawledDomain.SERIAL_IDENTIFIER)) {
|
||||
next = null;
|
||||
return false; // last record is expected to be the domain, so we're done
|
||||
} else if (identifier.equals(CrawledDocument.SERIAL_IDENTIFIER)) {
|
||||
next = gson.fromJson(data, CrawledDocument.class);
|
||||
} else {
|
||||
throw new IllegalStateException("Unknown identifier: " + identifier);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws Exception {
|
||||
bufferedReader.close();
|
||||
}
|
||||
}
|
@ -12,15 +12,16 @@ import java.io.*;
|
||||
import java.nio.file.Path;
|
||||
|
||||
/** This class is used to read the old format of crawl data, which was zstd-compressed JSON
|
||||
* with type delimiters between records.
|
||||
* with type delimiters between records. It does not preserve the semantics of the new format,
|
||||
* but it is faster.
|
||||
*/
|
||||
public class LegacySerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
|
||||
public class FastLegacySerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
|
||||
private final Gson gson;
|
||||
private final BufferedReader bufferedReader;
|
||||
private SerializableCrawlData next = null;
|
||||
|
||||
private final Path path;
|
||||
public LegacySerializableCrawlDataStream(Gson gson, File file) throws IOException {
|
||||
public FastLegacySerializableCrawlDataStream(Gson gson, File file) throws IOException {
|
||||
this.gson = gson;
|
||||
bufferedReader = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file), RecyclingBufferPool.INSTANCE)));
|
||||
path = file.toPath();
|
@ -95,7 +95,7 @@ public class CrawlPlan {
|
||||
}
|
||||
|
||||
try {
|
||||
return Optional.of(CrawledDomainReader.createDataStream(path));
|
||||
return Optional.of(CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.COMPATIBLE, path));
|
||||
}
|
||||
catch (IOException ex) {
|
||||
return Optional.empty();
|
||||
|
@ -272,7 +272,7 @@ public class CrawlerMain {
|
||||
|
||||
private CrawlDataReference getReference() {
|
||||
try {
|
||||
return new CrawlDataReference(CrawledDomainReader.createDataStream(outputDir, domain, id));
|
||||
return new CrawlDataReference(CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, outputDir, domain, id));
|
||||
} catch (IOException e) {
|
||||
logger.debug("Failed to read previous crawl data for {}", specification.domain);
|
||||
return new CrawlDataReference();
|
||||
|
@ -182,7 +182,7 @@ class CrawlerRetreiverTest {
|
||||
|
||||
convertToParquet(tempFileWarc1, tempFileParquet1);
|
||||
|
||||
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
|
||||
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet1)) {
|
||||
while (stream.hasNext()) {
|
||||
if (stream.next() instanceof CrawledDocument doc) {
|
||||
data.add(doc);
|
||||
@ -227,7 +227,7 @@ class CrawlerRetreiverTest {
|
||||
doCrawl(tempFileWarc1, specs);
|
||||
convertToParquet(tempFileWarc1, tempFileParquet1);
|
||||
|
||||
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
|
||||
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet1)) {
|
||||
while (stream.hasNext()) {
|
||||
if (stream.next() instanceof CrawledDocument doc) {
|
||||
data.add(doc);
|
||||
@ -274,7 +274,7 @@ class CrawlerRetreiverTest {
|
||||
doCrawl(tempFileWarc1, specs);
|
||||
convertToParquet(tempFileWarc1, tempFileParquet1);
|
||||
doCrawlWithReferenceStream(specs,
|
||||
CrawledDomainReader.createDataStream(tempFileParquet1)
|
||||
CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet1)
|
||||
);
|
||||
convertToParquet(tempFileWarc2, tempFileParquet2);
|
||||
|
||||
@ -295,7 +295,7 @@ class CrawlerRetreiverTest {
|
||||
});
|
||||
}
|
||||
|
||||
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) {
|
||||
try (var ds = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet2)) {
|
||||
while (ds.hasNext()) {
|
||||
var doc = ds.next();
|
||||
if (doc instanceof CrawledDomain dr) {
|
||||
@ -338,7 +338,7 @@ class CrawlerRetreiverTest {
|
||||
|
||||
convertToParquet(tempFileWarc1, tempFileParquet1);
|
||||
|
||||
try (var stream = CrawledDomainReader.createDataStream(tempFileParquet1)) {
|
||||
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet1)) {
|
||||
while (stream.hasNext()) {
|
||||
var doc = stream.next();
|
||||
data.computeIfAbsent(doc.getClass(), c -> new ArrayList<>()).add(doc);
|
||||
@ -347,7 +347,7 @@ class CrawlerRetreiverTest {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
var stream = CrawledDomainReader.createDataStream(tempFileParquet1);
|
||||
var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet1);
|
||||
|
||||
System.out.println("---");
|
||||
|
||||
@ -387,7 +387,7 @@ class CrawlerRetreiverTest {
|
||||
});
|
||||
}
|
||||
|
||||
try (var ds = CrawledDomainReader.createDataStream(tempFileParquet2)) {
|
||||
try (var ds = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.ANY, tempFileParquet2)) {
|
||||
while (ds.hasNext()) {
|
||||
var doc = ds.next();
|
||||
if (doc instanceof CrawledDomain dr) {
|
||||
|
@ -76,7 +76,7 @@ public class ExportAtagsActor extends RecordActorPrototype {
|
||||
}
|
||||
|
||||
Path crawlDataPath = inputDir.resolve(item.relPath());
|
||||
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
|
||||
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.FAST, crawlDataPath)) {
|
||||
exportLinks(tagWriter, stream);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
|
@ -60,7 +60,7 @@ public class CrawlDataUnfcker {
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
try (var stream = CrawledDomainReader.createDataStream(file)) {
|
||||
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.FAST, file)) {
|
||||
while (stream.hasNext()) {
|
||||
if (stream.next() instanceof CrawledDomain domain) {
|
||||
return Optional.of(domain);
|
||||
|
@ -4,13 +4,9 @@ import com.google.inject.Guice;
|
||||
import com.google.inject.Injector;
|
||||
import nu.marginalia.converting.ConverterModule;
|
||||
import nu.marginalia.crawling.io.CrawledDomainReader;
|
||||
import nu.marginalia.crawling.model.CrawledDocument;
|
||||
import nu.marginalia.crawling.model.CrawledDomain;
|
||||
import nu.marginalia.crawling.model.SerializableCrawlData;
|
||||
import nu.marginalia.process.log.WorkLog;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.tools.experiments.*;
|
||||
import plan.CrawlPlanLoader;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
@ -52,7 +48,7 @@ public class ExperimentRunnerMain {
|
||||
Path basePath = Path.of(args[0]);
|
||||
for (var item : WorkLog.iterable(basePath.resolve("crawler.log"))) {
|
||||
Path crawlDataPath = basePath.resolve(item.relPath());
|
||||
try (var stream = CrawledDomainReader.createDataStream(crawlDataPath)) {
|
||||
try (var stream = CrawledDomainReader.createDataStream(CrawledDomainReader.CompatibilityLevel.FAST, crawlDataPath)) {
|
||||
experiment.process(stream);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
|
Loading…
Reference in New Issue
Block a user