(converter) Optimize sideload-loading

Use ProcessingIterator to fan out processing of documents across more cores, instead of doing all of it in the writer thread blocking everything else with slow single-threaded processing.
This commit is contained in:
Viktor Lofgren 2023-12-29 14:25:48 +01:00
parent b5fc9673d9
commit e7dd28b926

View File

@ -20,6 +20,7 @@ import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.converting.processor.logic.links.TopKeywords;
import nu.marginalia.converting.processor.logic.LshDocumentDeduplicator;
import nu.marginalia.util.ProcessingIterator;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
@ -110,26 +111,21 @@ public class DomainProcessor {
@Override
public Iterator<ProcessedDocument> getDocumentsStream() {
return new DocumentsIterator();
}
return new ProcessingIterator<>(24, 16, (taskConsumer) -> {
while (dataStream.hasNext())
{
if (!(dataStream.next() instanceof CrawledDocument doc))
continue;
if (doc.url == null || !processedUrls.add(doc.url))
continue;
class DocumentsIterator implements Iterator<ProcessedDocument> {
ProcessedDocument next = null;
@Override
public boolean hasNext() {
try {
while (next == null
&& dataStream.hasNext())
{
if (!(dataStream.next() instanceof CrawledDocument doc))
continue;
if (doc.url == null || !processedUrls.add(doc.url))
continue;
taskConsumer.accept(() -> {
var processedDoc = documentProcessor.process(doc, domain.domain, externalDomainLinks, documentDecorator);
deduplicator.markIfDuplicate(processedDoc);
next = processedDoc;
synchronized (deduplicator) {
deduplicator.markIfDuplicate(processedDoc);
}
if (processedDoc.isProcessedFully()) {
// This is a bit sketchy, but we need to set the size and topology to something
@ -137,26 +133,10 @@ public class DomainProcessor {
10_000, externalDomainLinks.countForUrl(processedDoc.url));
}
return true;
}
return processedDoc;
});
}
catch (IOException ex) {
logger.warn("Failed to process domain sideload", ex);
}
return false;
}
@Override
public ProcessedDocument next() {
try {
if (next == null && !hasNext())
throw new NoSuchElementException();
return next;
} finally {
next = null;
}
}
});
}
@Override