diff --git a/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/ProcessingIterator.java b/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/ProcessingIterator.java index 52c93bb4..523381fa 100644 --- a/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/ProcessingIterator.java +++ b/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/ProcessingIterator.java @@ -26,16 +26,20 @@ public class ProcessingIterator implements Iterator { private final int parallelism; - public ProcessingIterator(int queueSize, int parallelism, ProcessingJob task) { + ProcessingIterator(ExecutorService executorService, int queueSize, int parallelism, ProcessingJob task) { this.parallelism = parallelism; queue = new LinkedBlockingQueue<>(queueSize); - executorService = Executors.newFixedThreadPool(parallelism); + this.executorService = executorService; sem = new Semaphore(parallelism); executorService.submit(() -> executeJob(task)); } + public static Factory factory(int queueSize, int parallelism) { + return new Factory(queueSize, parallelism); + } + private void executeJob(ProcessingJob job) { try { job.run(this::executeTask); @@ -83,10 +87,6 @@ public class ProcessingIterator implements Iterator { } } while (expectMore()); - if (!executorService.isShutdown()) { - executorService.shutdown(); - } - return false; } @@ -128,14 +128,39 @@ public class ProcessingIterator implements Iterator { * performed in parallel */ public interface ProcessingJob { + void run(Consumer> output) throws Exception; } - /** * A single task that produces a result to be iterable via the Iterator interface * (along with other tasks' outputs) */ public interface Task { + T get() throws Exception; } + + public static class Factory { + private final int queueSize; + private final int parallelism; + private final ExecutorService executorService; + + Factory(int queueSize, int parallelism) { + this.queueSize = queueSize; + this.parallelism = parallelism; + this.executorService = Executors.newFixedThreadPool(parallelism); + } + + public ProcessingIterator create(ProcessingJob task) { + return new ProcessingIterator<>(executorService, queueSize, parallelism, task); + } + + public void stop() { + if (!executorService.isShutdown()) { + executorService.shutdown(); + } + } + } + } + diff --git a/code/processes/converting-process/src/test/java/nu/marginalia/util/ProcessingIteratorTest.java b/code/libraries/blocking-thread-pool/src/test/java/nu/marginalia/util/ProcessingIteratorTest.java similarity index 89% rename from code/processes/converting-process/src/test/java/nu/marginalia/util/ProcessingIteratorTest.java rename to code/libraries/blocking-thread-pool/src/test/java/nu/marginalia/util/ProcessingIteratorTest.java index d20b7ddf..dd0f8d14 100644 --- a/code/processes/converting-process/src/test/java/nu/marginalia/util/ProcessingIteratorTest.java +++ b/code/libraries/blocking-thread-pool/src/test/java/nu/marginalia/util/ProcessingIteratorTest.java @@ -3,6 +3,7 @@ package nu.marginalia.util; import org.junit.jupiter.api.Test; import java.util.HashSet; +import java.util.Iterator; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -14,7 +15,7 @@ class ProcessingIteratorTest { @Test public void test() { Set output = new HashSet<>(); - var iter = new ProcessingIterator(2, 2, q -> { + Iterator iter = ProcessingIterator.factory(2, 2).create(q -> { for (int i = 0; i < 10_000; i++) { int j = i; q.accept(() -> task(j)); diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java index a25384a3..630f97f7 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/processor/DomainProcessor.java @@ -86,6 +86,7 @@ public class DomainProcessor { private final Set processedUrls = new HashSet<>(); private final DomainLinks externalDomainLinks; private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator(); + private static ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(24, 16); SideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint) throws IOException { this.dataStream = dataStream; @@ -112,7 +113,7 @@ public class DomainProcessor { @Override public Iterator getDocumentsStream() { - return new ProcessingIterator<>(24, 16, (taskConsumer) -> { + return iteratorFactory.create((taskConsumer) -> { while (dataStream.hasNext()) { if (!(dataStream.next() instanceof CrawledDocument doc)) diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/encyclopedia/EncyclopediaMarginaliaNuSideloader.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/encyclopedia/EncyclopediaMarginaliaNuSideloader.java index f0686b4c..3220703a 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/encyclopedia/EncyclopediaMarginaliaNuSideloader.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/encyclopedia/EncyclopediaMarginaliaNuSideloader.java @@ -76,7 +76,8 @@ public class EncyclopediaMarginaliaNuSideloader implements SideloadSource, AutoC @SneakyThrows @Override public Iterator getDocumentsStream() { - return new ProcessingIterator<>(24, 16, (taskConsumer) -> { + // This leaks a thread pool, but it doesn't matter since this is a one-off process + return ProcessingIterator.factory(24, 16).create((taskConsumer) -> { DomainLinks domainLinks = getDomainLinks(); var stmt = connection.prepareStatement("""