(converter) WIP Run sideload-style processing for large domains

The processor normally retains the domain data in memory after processing to be able to do additional site-wide analysis.   This works well, except there are a number of outlier websites that have an absurd number of documents that can rapidly fill up the heap of the process.

These websites now receive a simplified treatment.  This is executed in the converter batch writer thread.  This is slower, but the documents will not be persisted in memory.
This commit is contained in:
Viktor Lofgren 2023-12-27 18:20:03 +01:00
parent acf7bcc7a6
commit 24051fec03
10 changed files with 208 additions and 29 deletions

View File

@ -6,9 +6,9 @@ import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.sideload.SideloadSource;
import nu.marginalia.converting.sideload.SideloadSourceFactory;
import nu.marginalia.converting.writer.ConverterBatchWritableIf;
import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.converting.writer.ConverterWriter;
import nu.marginalia.storage.FileStorageService;
@ -109,7 +109,7 @@ public class ConverterMain {
taskHeartbeat.progress(sideloadSource.domainName(), i++, sideloadSources.size());
writer.write(sideloadSource);
writer.writeSideloadSource(sideloadSource);
}
taskHeartbeat.progress("Finished", i, sideloadSources.size());
@ -139,8 +139,8 @@ public class ConverterMain {
{
pool.submit(() -> {
try {
ProcessedDomain processed = processor.process(domain);
converterWriter.accept(processed);
ConverterBatchWritableIf writable = processor.createWritable(domain);
converterWriter.accept(writable);
}
catch (Exception ex) {
logger.info("Error in processing", ex);

View File

@ -1,15 +1,18 @@
package nu.marginalia.converting.model;
import lombok.ToString;
import nu.marginalia.converting.writer.ConverterBatchWritableIf;
import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.crawl.DomainIndexingState;
import org.jetbrains.annotations.Nullable;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
@ToString
public class ProcessedDomain {
public class ProcessedDomain implements ConverterBatchWritableIf {
public EdgeDomain domain;
public List<ProcessedDocument> documents;
@ -26,4 +29,17 @@ public class ProcessedDomain {
public int size() {
return Optional.ofNullable(documents).map(List::size).orElse(1);
}
@Override
public void write(ConverterBatchWriter writer) throws IOException {
writer.writeDomainData(this);
}
@Override
public String id() {
return domain.toString();
}
@Override
public void close() {}
}

View File

@ -8,6 +8,9 @@ import nu.marginalia.atags.source.AnchorTagsSource;
import nu.marginalia.atags.source.AnchorTagsSourceFactory;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.converting.processor.logic.links.LinkGraph;
import nu.marginalia.converting.sideload.SideloadSource;
import nu.marginalia.converting.writer.ConverterBatchWritableIf;
import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.crawling.model.*;
import nu.marginalia.geoip.GeoIpDictionary;
@ -17,11 +20,15 @@ import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.converting.processor.logic.links.TopKeywords;
import nu.marginalia.converting.processor.logic.LshDocumentDeduplicator;
import nu.marginalia.util.ProcessingIterator;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.*;
import java.util.regex.Pattern;
@ -33,6 +40,11 @@ public class DomainProcessor {
private final AnchorTextKeywords anchorTextKeywords;
private final GeoIpDictionary geoIpDictionary;
// The threshold for running a cheaper sideloading-style process
// (10 MB is ~ 99.5%th percentile of domain data sizes)
private static final long DOMAIN_SIDELOAD_THRESHOLD = 10_000_000L;
private final Logger logger = LoggerFactory.getLogger(getClass());
@Inject
@ -51,9 +63,130 @@ public class DomainProcessor {
geoIpDictionary.waitReady();
}
public ConverterBatchWritableIf createWritable(SerializableCrawlDataStream domain) throws IOException {
Path filePath = domain.path();
if (filePath != null && Files.size(filePath) > DOMAIN_SIDELOAD_THRESHOLD) {
// If the file is too big, we run a processing mode that doesn't
// require loading the entire dataset into RAM
return sideloadProcessing(domain);
}
return fullProcessing(domain);
}
public ConverterBatchWritableIf sideloadProcessing(SerializableCrawlDataStream dataStream) {
try {
return new SideloadProcessing(dataStream);
}
catch (Exception ex) {
logger.warn("Failed to process domain sideload", ex);
return null;
}
}
class SideloadProcessing implements ConverterBatchWritableIf, SideloadSource {
private final SerializableCrawlDataStream dataStream;
private final ProcessedDomain domain;
private final DocumentDecorator documentDecorator;
private final Set<String> processedUrls = new HashSet<>();
private final DomainLinks externalDomainLinks;
private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator();
SideloadProcessing(SerializableCrawlDataStream dataStream) throws IOException {
this.dataStream = dataStream;
if (!dataStream.hasNext()) {
throw new IllegalStateException("No data in stream");
}
if (!(dataStream.next() instanceof CrawledDomain crawledDomain)) {
throw new IllegalStateException("First record must be a domain");
}
domain = new ProcessedDomain();
externalDomainLinks = anchorTagsSource.getAnchorTags(domain.domain);
documentDecorator = new DocumentDecorator(anchorTextKeywords, externalDomainLinks);
processDomain(crawledDomain, domain, documentDecorator);
}
@Override
public ProcessedDomain getDomain() {
return domain;
}
@Override
public Iterator<ProcessedDocument> getDocumentsStream() {
return new DocumentsIterator();
}
class DocumentsIterator implements Iterator<ProcessedDocument> {
ProcessedDocument next = null;
@Override
public boolean hasNext() {
try {
while (next != null
&& dataStream.hasNext()
&& dataStream.next() instanceof CrawledDocument doc)
{
if (doc.url == null || !processedUrls.add(doc.url))
continue;
var processedDoc = documentProcessor.process(doc, externalDomainLinks, documentDecorator);
deduplicator.markIfDuplicate(processedDoc);
next = processedDoc;
if (processedDoc.isProcessedFully()) {
// This is a bit sketchy, but we need to set the size and topology to something
processedDoc.details.metadata = processedDoc.details.metadata.withSizeAndTopology(
10_000, externalDomainLinks.countForUrl(processedDoc.url));
}
return true;
}
}
catch (IOException ex) {
logger.warn("Failed to process domain sideload", ex);
}
return false;
}
@Override
public ProcessedDocument next() {
try {
if (next == null && !hasNext())
throw new NoSuchElementException();
return next;
} finally {
next = null;
}
}
}
@Override
public void write(ConverterBatchWriter writer) throws IOException {
writer.writeSideloadSource(this);
}
@Override
public String id() {
return domain.domain.toString();
}
@Override
public void close() throws Exception {
dataStream.close();
deduplicator.close();
}
}
@SneakyThrows
@Nullable
public ProcessedDomain process(SerializableCrawlDataStream dataStream) {
public ProcessedDomain fullProcessing(SerializableCrawlDataStream dataStream) {
if (!dataStream.hasNext()) {
return null;
}
@ -83,8 +216,7 @@ public class DomainProcessor {
if (data instanceof CrawledDomain crawledDomain) {
documentDecorator = new DocumentDecorator(anchorTextKeywords, externalDomainLinks);
ret = processDomain(crawledDomain, ret, documentDecorator);
processDomain(crawledDomain, ret, documentDecorator);
ret.documents = docs;
} else if (data instanceof CrawledDocument doc) {
@ -112,25 +244,23 @@ public class DomainProcessor {
return ret;
}
private ProcessedDomain processDomain(CrawledDomain crawledDomain,
ProcessedDomain ret,
private void processDomain(CrawledDomain crawledDomain,
ProcessedDomain domain,
DocumentDecorator decorator)
{
ret.domain = new EdgeDomain(crawledDomain.domain);
ret.ip = crawledDomain.ip;
domain.domain = new EdgeDomain(crawledDomain.domain);
domain.ip = crawledDomain.ip;
addIpInfo(decorator, crawledDomain.ip);
if (isAcademicDomain(ret.domain)) {
if (isAcademicDomain(domain.domain)) {
decorator.addTerm("special:academia");
}
if (crawledDomain.redirectDomain != null) {
ret.redirect = new EdgeDomain(crawledDomain.redirectDomain);
domain.redirect = new EdgeDomain(crawledDomain.redirectDomain);
}
ret.state = getState(crawledDomain.crawlerStatus);
return ret;
domain.state = getState(crawledDomain.crawlerStatus);
}
@ -232,4 +362,5 @@ public class DomainProcessor {
};
}
}

View File

@ -0,0 +1,9 @@
package nu.marginalia.converting.writer;
import java.io.IOException;
public interface ConverterBatchWritableIf {
void write(ConverterBatchWriter writer) throws IOException;
String id();
void close() throws Exception;
}

View File

@ -27,7 +27,7 @@ import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
/** Writer for a single batch of converter parquet files */
public class ConverterBatchWriter implements AutoCloseable {
public class ConverterBatchWriter implements AutoCloseable, ConverterBatchWriterIf {
private final DomainRecordParquetFileWriter domainWriter;
private final DomainLinkRecordParquetFileWriter domainLinkWriter;
private final DocumentRecordParquetFileWriter documentWriter;
@ -46,7 +46,13 @@ public class ConverterBatchWriter implements AutoCloseable {
);
}
public void write(SideloadSource sideloadSource) throws IOException {
@Override
public void write(ConverterBatchWritableIf writable) throws IOException {
writable.write(this);
}
@Override
public void writeSideloadSource(SideloadSource sideloadSource) throws IOException {
var domain = sideloadSource.getDomain();
writeDomainData(domain);
@ -54,7 +60,8 @@ public class ConverterBatchWriter implements AutoCloseable {
writeDocumentData(domain.domain, sideloadSource.getDocumentsStream());
}
public void write(ProcessedDomain domain) {
@Override
public void writeProcessedDomain(ProcessedDomain domain) {
var results = ForkJoinPool.commonPool().invokeAll(
writeTasks(domain)
);
@ -180,7 +187,7 @@ public class ConverterBatchWriter implements AutoCloseable {
return this;
}
private Object writeDomainData(ProcessedDomain domain) throws IOException {
public Object writeDomainData(ProcessedDomain domain) throws IOException {
DomainMetadata metadata = DomainMetadata.from(domain);
List<String> feeds = getFeedUrls(domain);

View File

@ -0,0 +1,15 @@
package nu.marginalia.converting.writer;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.sideload.SideloadSource;
import java.io.IOException;
public interface ConverterBatchWriterIf {
void write(ConverterBatchWritableIf writable) throws IOException;
void writeSideloadSource(SideloadSource sideloadSource) throws IOException;
void writeProcessedDomain(ProcessedDomain domain);
}

View File

@ -24,7 +24,7 @@ public class ConverterWriter implements AutoCloseable {
private final Duration switchInterval
= Duration.of(10, ChronoUnit.MINUTES);
private final ArrayBlockingQueue<ProcessedDomain> domainData
private final ArrayBlockingQueue<ConverterBatchWritableIf> domainData
= new ArrayBlockingQueue<>(1);
private final Thread workerThread;
@ -42,7 +42,7 @@ public class ConverterWriter implements AutoCloseable {
}
@SneakyThrows
public void accept(@Nullable ProcessedDomain domain) {
public void accept(@Nullable ConverterBatchWritableIf domain) {
if (null == domain)
return;
@ -66,10 +66,11 @@ public class ConverterWriter implements AutoCloseable {
if (data == null)
continue;
String id = data.domain.toString();
String id = data.id();
if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id)) {
logger.warn("Skipping already logged item {}", id);
data.close();
continue;
}

View File

@ -43,7 +43,7 @@ public class ConvertingIntegrationTest {
var domain = new CrawledDomain("memex.marginalia.nu", null, "OK", "-", "127.0.0.1",
docs, Collections.emptyList());
var ret = domainProcessor.process(asSerializableCrawlData(domain));
var ret = domainProcessor.fullProcessing(asSerializableCrawlData(domain));
assertEquals(ret.state, DomainIndexingState.ACTIVE);
assertEquals(ret.domain, new EdgeDomain("memex.marginalia.nu"));
@ -51,7 +51,7 @@ public class ConvertingIntegrationTest {
}
@Test
public void testMemexMarginaliaNuDateInternalConsistency() throws IOException {
var ret = domainProcessor.process(asSerializableCrawlData(readMarginaliaWorkingSet()));
var ret = domainProcessor.fullProcessing(asSerializableCrawlData(readMarginaliaWorkingSet()));
ret.documents.stream().filter(ProcessedDocument::isProcessedFully).forEach(doc -> {
int year = PubDate.fromYearByte(doc.details.metadata.year());
Integer yearMeta = doc.details.pubYear;
@ -64,7 +64,7 @@ public class ConvertingIntegrationTest {
@Test
public void testMemexMarginaliaNu() throws IOException {
var ret = domainProcessor.process(asSerializableCrawlData(readMarginaliaWorkingSet()));
var ret = domainProcessor.fullProcessing(asSerializableCrawlData(readMarginaliaWorkingSet()));
assertNotNull(ret);
assertEquals(ret.state, DomainIndexingState.ACTIVE);
assertEquals(ret.domain, new EdgeDomain("memex.marginalia.nu"));

View File

@ -251,7 +251,7 @@ public class CrawlingThenConvertingIntegrationTest {
private ProcessedDomain process() {
try (var stream = new ParquetSerializableCrawlDataStream(fileName2)) {
return domainProcessor.process(stream);
return domainProcessor.fullProcessing(stream);
}
catch (Exception e) {
Assertions.fail(e);

View File

@ -22,7 +22,7 @@ public class SiteStatisticsExperiment extends Experiment {
@Override
public boolean process(SerializableCrawlDataStream stream) {
var ret = domainProcessor.process(stream);
var ret = domainProcessor.fullProcessing(stream);
ret.documents.stream()
.filter(ProcessedDocument::isProcessedFully)