(converter) Refactor EncyclopediaMarginaliaNuSideloader to use ProcessingIterator
Refactored the getDocumentsStream method in EncyclopediaMarginaliaNuSideloader to use the newly extracted ProcessingIterator class that encapsulates processing a stream of results from e.g a database query in parallel and returning the computed results as an iterator. The iterator was also improved on to be more reliable, previous versions of the logic would sometimes deadlock due to false positives in hasMore().
This commit is contained in:
parent
b6511fbfe2
commit
5c46af0edb
@ -0,0 +1,139 @@
|
||||
package nu.marginalia.util;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Iterator;
|
||||
import java.util.NoSuchElementException;
|
||||
import java.util.concurrent.*;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* Abstraction for exposing a (typically) read-from-disk -> parallel processing -> sequential output
|
||||
* workflow as an iterator, where the number of tasks is much larger than the number of cores
|
||||
*/
|
||||
public class ProcessingIterator<T> implements Iterator<T> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(ProcessingIterator.class);
|
||||
|
||||
private final LinkedBlockingQueue<T> queue;
|
||||
private final AtomicBoolean isFinished = new AtomicBoolean(false);
|
||||
private final ExecutorService executorService;
|
||||
private final Semaphore sem;
|
||||
|
||||
private T next = null;
|
||||
|
||||
private final int parallelism;
|
||||
|
||||
public ProcessingIterator(int queueSize, int parallelism, ProcessingJob<T> task) {
|
||||
this.parallelism = parallelism;
|
||||
|
||||
queue = new LinkedBlockingQueue<>(queueSize);
|
||||
executorService = Executors.newFixedThreadPool(parallelism);
|
||||
sem = new Semaphore(parallelism);
|
||||
|
||||
executorService.submit(() -> executeJob(task));
|
||||
}
|
||||
|
||||
private void executeJob(ProcessingJob<T> job) {
|
||||
try {
|
||||
job.run(this::executeTask);
|
||||
} catch (Exception e) {
|
||||
logger.warn("Exception while processing", e);
|
||||
} finally {
|
||||
isFinished.set(true);
|
||||
}
|
||||
}
|
||||
|
||||
private void executeTask(Task<T> task) {
|
||||
try {
|
||||
sem.acquire();
|
||||
} catch (InterruptedException e) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
queue.put(task.get());
|
||||
} catch (Exception e) {
|
||||
logger.warn("Exception while processing", e);
|
||||
} finally {
|
||||
sem.release();
|
||||
}
|
||||
}
|
||||
|
||||
/** Returns true if there are more documents to be processed.
|
||||
* This method may block until we are certain this is true.
|
||||
* <p>
|
||||
* This method must be invoked from the same thread that invokes next(),
|
||||
* (or synchronize between the two)
|
||||
*/
|
||||
@Override
|
||||
@SneakyThrows
|
||||
public boolean hasNext() {
|
||||
if (next != null)
|
||||
return true;
|
||||
|
||||
do {
|
||||
next = queue.poll(1, TimeUnit.SECONDS);
|
||||
if (next != null) {
|
||||
return true;
|
||||
}
|
||||
} while (expectMore());
|
||||
|
||||
if (!executorService.isShutdown()) {
|
||||
executorService.shutdown();
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Heuristic for if we should expect more documents to be processed,
|
||||
* _trust but verify_ since we don't run this in an exclusive section
|
||||
* and may get a false positive. We never expect a false negative though.
|
||||
*/
|
||||
private boolean expectMore() {
|
||||
return !isFinished.get() // we are still reading from the database
|
||||
|| !queue.isEmpty() // ... or we have documents in the queue
|
||||
|| sem.availablePermits() < parallelism; // ... or we are still processing documents
|
||||
}
|
||||
|
||||
/** Returns the next document to be processed.
|
||||
* This method may block until we are certain there is a document to be processed.
|
||||
* <p>
|
||||
* This method must be invoked from the same thread that invokes hasNext(),
|
||||
* (or synchronize between the two)
|
||||
* <p>
|
||||
* If this is run after hasNext() returns false, a NoSuchElementException is thrown.
|
||||
*/
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public T next() {
|
||||
if (!hasNext()) {
|
||||
throw new NoSuchElementException();
|
||||
}
|
||||
|
||||
try {
|
||||
return next;
|
||||
}
|
||||
finally {
|
||||
next = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A job that produces a sequence of processing tasks that are to be
|
||||
* performed in parallel
|
||||
*/
|
||||
public interface ProcessingJob<T2> {
|
||||
void run(Consumer<Task<T2>> output) throws Exception;
|
||||
}
|
||||
|
||||
/**
|
||||
* A single task that produces a result to be iterable via the Iterator interface
|
||||
* (along with other tasks' outputs)
|
||||
*/
|
||||
public interface Task<T> {
|
||||
T get() throws Exception;
|
||||
}
|
||||
}
|
@ -15,6 +15,7 @@ import nu.marginalia.converting.sideload.SideloaderProcessing;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
import nu.marginalia.model.crawl.DomainIndexingState;
|
||||
import nu.marginalia.util.ProcessingIterator;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -28,11 +29,6 @@ import java.nio.file.Path;
|
||||
import java.sql.*;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
/** This is an experimental sideloader for encyclopedia.marginalia.nu's database;
|
||||
* (which serves as a way of loading wikipedia's zim files without binding to GPL2'd code)
|
||||
@ -80,62 +76,24 @@ public class EncyclopediaMarginaliaNuSideloader implements SideloadSource, AutoC
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public Iterator<ProcessedDocument> getDocumentsStream() {
|
||||
LinkedBlockingQueue<ProcessedDocument> docs = new LinkedBlockingQueue<>(32);
|
||||
AtomicBoolean isFinished = new AtomicBoolean(false);
|
||||
return new ProcessingIterator<>(24, 16, (taskConsumer) -> {
|
||||
DomainLinks domainLinks = getDomainLinks();
|
||||
|
||||
ExecutorService executorService = Executors.newFixedThreadPool(16);
|
||||
Semaphore sem = new Semaphore(16);
|
||||
var stmt = connection.prepareStatement("""
|
||||
SELECT url,title,html FROM articles
|
||||
""");
|
||||
stmt.setFetchSize(100);
|
||||
|
||||
DomainLinks domainLinks = getDomainLinks();
|
||||
var rs = stmt.executeQuery();
|
||||
|
||||
executorService.submit(() -> {
|
||||
try {
|
||||
var stmt = connection.prepareStatement("""
|
||||
SELECT url,title,html FROM articles
|
||||
""");
|
||||
stmt.setFetchSize(100);
|
||||
while (rs.next()) {
|
||||
var articleParts = fromCompressedJson(rs.getBytes("html"), ArticleParts.class);
|
||||
String title = rs.getString("title");
|
||||
String url = URLEncoder.encode(rs.getString("url"), StandardCharsets.UTF_8);
|
||||
|
||||
var rs = stmt.executeQuery();
|
||||
while (rs.next()) {
|
||||
var articleParts = fromCompressedJson(rs.getBytes("html"), ArticleParts.class);
|
||||
String title = rs.getString("title");
|
||||
String url = URLEncoder.encode(rs.getString("url"), StandardCharsets.UTF_8);
|
||||
|
||||
sem.acquire();
|
||||
|
||||
executorService.submit(() -> {
|
||||
try {
|
||||
docs.add(convertDocument(articleParts.parts, title, url, domainLinks));
|
||||
} catch (URISyntaxException | DisqualifiedException e) {
|
||||
logger.warn("Problem converting encyclopedia article " + url, e);
|
||||
} finally {
|
||||
sem.release();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
stmt.close();
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.warn("Problem converting encyclopedia article", e);
|
||||
}
|
||||
finally {
|
||||
isFinished.set(true);
|
||||
taskConsumer.accept(() -> convertDocument(articleParts.parts, title, url, domainLinks));
|
||||
}
|
||||
});
|
||||
|
||||
return new Iterator<>() {
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return !isFinished.get() || !docs.isEmpty() || sem.availablePermits() < 16;
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
@Override
|
||||
public ProcessedDocument next() {
|
||||
return docs.take();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private DomainLinks getDomainLinks() {
|
||||
|
@ -0,0 +1,38 @@
|
||||
package nu.marginalia.util;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
class ProcessingIteratorTest {
|
||||
|
||||
@Test
|
||||
public void test() {
|
||||
Set<Integer> output = new HashSet<>();
|
||||
var iter = new ProcessingIterator<Integer>(2, 2, q -> {
|
||||
for (int i = 0; i < 10_000; i++) {
|
||||
int j = i;
|
||||
q.accept(() -> task(j));
|
||||
}
|
||||
});
|
||||
while (iter.hasNext()) {
|
||||
output.add(iter.next());
|
||||
}
|
||||
|
||||
assertEquals(10_000, output.size());
|
||||
|
||||
for (int i = 0; i < 10_000; i++) {
|
||||
assertTrue(output.contains(i));
|
||||
}
|
||||
}
|
||||
|
||||
int task(int n) throws InterruptedException {
|
||||
TimeUnit.NANOSECONDS.sleep(10);
|
||||
return n;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user