(converter) Better use of ProcessingIterator

Modify processingiterator to be constructed via a factory, to enable re-use of its backing executor service.

This reduces thread churn in the converter sideloader style processing of regular crawl data.
This commit is contained in:
Viktor Lofgren 2023-12-30 13:53:55 +01:00
parent 70c83b60a1
commit 7a1d20ed0a
4 changed files with 38 additions and 10 deletions

View File

@ -26,16 +26,20 @@ public class ProcessingIterator<T> implements Iterator<T> {
private final int parallelism;
public ProcessingIterator(int queueSize, int parallelism, ProcessingJob<T> task) {
ProcessingIterator(ExecutorService executorService, int queueSize, int parallelism, ProcessingJob<T> task) {
this.parallelism = parallelism;
queue = new LinkedBlockingQueue<>(queueSize);
executorService = Executors.newFixedThreadPool(parallelism);
this.executorService = executorService;
sem = new Semaphore(parallelism);
executorService.submit(() -> executeJob(task));
}
public static Factory factory(int queueSize, int parallelism) {
return new Factory(queueSize, parallelism);
}
private void executeJob(ProcessingJob<T> job) {
try {
job.run(this::executeTask);
@ -83,10 +87,6 @@ public class ProcessingIterator<T> implements Iterator<T> {
}
} while (expectMore());
if (!executorService.isShutdown()) {
executorService.shutdown();
}
return false;
}
@ -128,14 +128,39 @@ public class ProcessingIterator<T> implements Iterator<T> {
* performed in parallel
*/
public interface ProcessingJob<T2> {
void run(Consumer<Task<T2>> output) throws Exception;
}
/**
* A single task that produces a result to be iterable via the Iterator interface
* (along with other tasks' outputs)
*/
public interface Task<T> {
T get() throws Exception;
}
public static class Factory {
private final int queueSize;
private final int parallelism;
private final ExecutorService executorService;
Factory(int queueSize, int parallelism) {
this.queueSize = queueSize;
this.parallelism = parallelism;
this.executorService = Executors.newFixedThreadPool(parallelism);
}
public <T> ProcessingIterator<T> create(ProcessingJob<T> task) {
return new ProcessingIterator<>(executorService, queueSize, parallelism, task);
}
public void stop() {
if (!executorService.isShutdown()) {
executorService.shutdown();
}
}
}
}

View File

@ -3,6 +3,7 @@ package nu.marginalia.util;
import org.junit.jupiter.api.Test;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@ -14,7 +15,7 @@ class ProcessingIteratorTest {
@Test
public void test() {
Set<Integer> output = new HashSet<>();
var iter = new ProcessingIterator<Integer>(2, 2, q -> {
Iterator<Integer> iter = ProcessingIterator.factory(2, 2).create(q -> {
for (int i = 0; i < 10_000; i++) {
int j = i;
q.accept(() -> task(j));

View File

@ -86,6 +86,7 @@ public class DomainProcessor {
private final Set<String> processedUrls = new HashSet<>();
private final DomainLinks externalDomainLinks;
private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator();
private static ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(24, 16);
SideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint) throws IOException {
this.dataStream = dataStream;
@ -112,7 +113,7 @@ public class DomainProcessor {
@Override
public Iterator<ProcessedDocument> getDocumentsStream() {
return new ProcessingIterator<>(24, 16, (taskConsumer) -> {
return iteratorFactory.create((taskConsumer) -> {
while (dataStream.hasNext())
{
if (!(dataStream.next() instanceof CrawledDocument doc))

View File

@ -76,7 +76,8 @@ public class EncyclopediaMarginaliaNuSideloader implements SideloadSource, AutoC
@SneakyThrows
@Override
public Iterator<ProcessedDocument> getDocumentsStream() {
return new ProcessingIterator<>(24, 16, (taskConsumer) -> {
// This leaks a thread pool, but it doesn't matter since this is a one-off process
return ProcessingIterator.factory(24, 16).create((taskConsumer) -> {
DomainLinks domainLinks = getDomainLinks();
var stmt = connection.prepareStatement("""