(converter) Fix bugs in conversion
This commit adds a safety check that the URL of the document is from the correct domain. It also adds a sizeHint() method to SerializableCrawlDataStream which *may* provide an indication if the stream is very large and benefits from sideload-style processing (which is slow). It furthermore addresses a bug where the ProcessedDomain.write() invoked the wrong method on ConverterBatchWriter and only wrote the domain metadata, not the rest...
This commit is contained in:
parent
407915a86e
commit
dec3b1092d
6 changed files with 58 additions and 16 deletions
|
@ -17,6 +17,10 @@ public interface SerializableCrawlDataStream extends AutoCloseable {
|
|||
|
||||
SerializableCrawlData next() throws IOException;
|
||||
|
||||
/** Return a size hint for the stream. 0 is returned if the hint is not available,
|
||||
* or if the file is seemed too small to bother */
|
||||
default int sizeHint() { return 0; }
|
||||
|
||||
boolean hasNext() throws IOException;
|
||||
|
||||
@Nullable
|
||||
|
|
|
@ -14,6 +14,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.*;
|
||||
|
||||
|
@ -37,6 +38,21 @@ public class ParquetSerializableCrawlDataStream implements AutoCloseable, Serial
|
|||
return path;
|
||||
}
|
||||
|
||||
public int sizeHint() {
|
||||
// Only calculate size hint for large files
|
||||
// (the reason we calculate them in the first place is to assess whether it is large
|
||||
// because it has many documents, or because it is a small number of large documents)
|
||||
try {
|
||||
if (Files.size(path) > 10_000_000) {
|
||||
return CrawledDocumentParquetRecordFileReader.countGoodStatusCodes(path);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
// suppressed
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
@SneakyThrows
|
||||
public boolean hasNext() {
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
package nu.marginalia.crawling.parquet;
|
||||
|
||||
import blue.strategic.parquet.Hydrator;
|
||||
import blue.strategic.parquet.HydratorSupplier;
|
||||
import blue.strategic.parquet.ParquetReader;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class CrawledDocumentParquetRecordFileReader {
|
||||
|
@ -16,4 +18,25 @@ public class CrawledDocumentParquetRecordFileReader {
|
|||
HydratorSupplier.constantly(CrawledDocumentParquetRecord.newHydrator()));
|
||||
}
|
||||
|
||||
/** Count the number of documents with a 200 status code */
|
||||
public static int countGoodStatusCodes(Path path) throws IOException {
|
||||
return (int) ParquetReader.streamContent(path.toFile(),
|
||||
HydratorSupplier.constantly(new Hydrator<Integer, Integer>() {
|
||||
@Override
|
||||
public Integer start() { return 0; }
|
||||
@Override
|
||||
public Integer add(Integer target, String heading, Object value) {
|
||||
if ("statusCode".equals(heading) && Integer.valueOf(200).equals(value)) {
|
||||
return 1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
@Override
|
||||
public Integer finish(Integer target) { return target; }
|
||||
}),
|
||||
List.of("statusCode"))
|
||||
.mapToInt(Integer::valueOf)
|
||||
.count();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ public class ProcessedDomain implements ConverterBatchWritableIf {
|
|||
|
||||
@Override
|
||||
public void write(ConverterBatchWriter writer) throws IOException {
|
||||
writer.writeDomainData(this);
|
||||
writer.writeProcessedDomain(this);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -4,6 +4,7 @@ import com.google.inject.Inject;
|
|||
import nu.marginalia.atags.model.DomainLinks;
|
||||
import nu.marginalia.crawling.model.CrawledDocument;
|
||||
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.model.crawl.HtmlFeature;
|
||||
import nu.marginalia.model.crawl.UrlIndexingState;
|
||||
import nu.marginalia.converting.model.DisqualifiedException;
|
||||
|
@ -40,6 +41,7 @@ public class DocumentProcessor {
|
|||
}
|
||||
|
||||
public ProcessedDocument process(CrawledDocument crawledDocument,
|
||||
EdgeDomain domain,
|
||||
DomainLinks externalDomainLinks,
|
||||
DocumentDecorator documentDecorator) {
|
||||
ProcessedDocument ret = new ProcessedDocument();
|
||||
|
@ -48,6 +50,12 @@ public class DocumentProcessor {
|
|||
// We must always provide the URL, even if we don't process the document
|
||||
ret.url = getDocumentUrl(crawledDocument);
|
||||
|
||||
if (!Objects.equals(ret.url.domain, domain)) {
|
||||
ret.state = UrlIndexingState.DISQUALIFIED;
|
||||
ret.stateReason = DisqualifiedException.DisqualificationReason.PROCESSING_EXCEPTION.toString();
|
||||
return ret;
|
||||
}
|
||||
|
||||
DocumentClass documentClass = switch (externalDomainLinks.countForUrl(ret.url)) {
|
||||
case 0 -> DocumentClass.NORMAL;
|
||||
case 1 -> DocumentClass.EXTERNALLY_LINKED_ONCE;
|
||||
|
|
|
@ -20,15 +20,12 @@ import nu.marginalia.converting.model.ProcessedDomain;
|
|||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.converting.processor.logic.links.TopKeywords;
|
||||
import nu.marginalia.converting.processor.logic.LshDocumentDeduplicator;
|
||||
import nu.marginalia.util.ProcessingIterator;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jetbrains.annotations.Nullable;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.sql.SQLException;
|
||||
import java.util.*;
|
||||
import java.util.regex.Pattern;
|
||||
|
@ -40,11 +37,6 @@ public class DomainProcessor {
|
|||
private final AnchorTextKeywords anchorTextKeywords;
|
||||
private final GeoIpDictionary geoIpDictionary;
|
||||
|
||||
|
||||
// The threshold for running a cheaper sideloading-style process
|
||||
// (10 MB is ~ 99.5%th percentile of domain data sizes)
|
||||
private static final long DOMAIN_SIDELOAD_THRESHOLD = 10_000_000L;
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
@Inject
|
||||
|
@ -63,12 +55,11 @@ public class DomainProcessor {
|
|||
geoIpDictionary.waitReady();
|
||||
}
|
||||
|
||||
public ConverterBatchWritableIf createWritable(SerializableCrawlDataStream domain) throws IOException {
|
||||
Path filePath = domain.path();
|
||||
|
||||
if (filePath != null && Files.size(filePath) > DOMAIN_SIDELOAD_THRESHOLD) {
|
||||
public ConverterBatchWritableIf createWritable(SerializableCrawlDataStream domain) {
|
||||
if (domain.sizeHint() > 10_000) {
|
||||
// If the file is too big, we run a processing mode that doesn't
|
||||
// require loading the entire dataset into RAM
|
||||
logger.info("Sideloading {}", domain.path());
|
||||
return sideloadProcessing(domain);
|
||||
}
|
||||
|
||||
|
@ -100,7 +91,7 @@ public class DomainProcessor {
|
|||
if (!dataStream.hasNext()
|
||||
|| !(dataStream.next() instanceof CrawledDomain crawledDomain))
|
||||
{
|
||||
throw new IllegalStateException("First record must be a domain");
|
||||
throw new IllegalStateException("First record must be a domain, was " + dataStream.next().getClass().getSimpleName());
|
||||
}
|
||||
|
||||
domain = new ProcessedDomain();
|
||||
|
@ -135,7 +126,7 @@ public class DomainProcessor {
|
|||
if (doc.url == null || !processedUrls.add(doc.url))
|
||||
continue;
|
||||
|
||||
var processedDoc = documentProcessor.process(doc, externalDomainLinks, documentDecorator);
|
||||
var processedDoc = documentProcessor.process(doc, domain.domain, externalDomainLinks, documentDecorator);
|
||||
|
||||
deduplicator.markIfDuplicate(processedDoc);
|
||||
next = processedDoc;
|
||||
|
@ -226,7 +217,7 @@ public class DomainProcessor {
|
|||
if (doc.url == null || !processedUrls.add(doc.url))
|
||||
continue;
|
||||
|
||||
var processedDoc = documentProcessor.process(doc, externalDomainLinks, documentDecorator);
|
||||
var processedDoc = documentProcessor.process(doc, ret.domain, externalDomainLinks, documentDecorator);
|
||||
|
||||
deduplicator.markIfDuplicate(processedDoc);
|
||||
|
||||
|
|
Loading…
Reference in a new issue