(converter) Use SimpleBlockingThreadPool in ProcessingIterator

This commit is contained in:
Viktor Lofgren 2024-01-03 13:40:44 +01:00
parent 3caa4eed75
commit 4ce692ccaf
3 changed files with 36 additions and 32 deletions

View File

@ -80,6 +80,14 @@ public class ResultValuator {
temporalBias = 0;
}
logger.info("averageSentenceLengthPenalty: " + averageSentenceLengthPenalty);
logger.info("documentLengthPenalty: " + documentLengthPenalty);
logger.info("qualityPenalty: " + qualityPenalty);
logger.info("rankingBonus: " + rankingBonus);
logger.info("topologyBonus: " + topologyBonus);
logger.info("temporalBias: " + temporalBias);
logger.info("flagsPenalty: " + flagsPenalty);
double overallPart = averageSentenceLengthPenalty
+ documentLengthPenalty
+ qualityPenalty
@ -112,9 +120,22 @@ public class ResultValuator {
double overallPartPositive = Math.max(0, overallPart);
double overallPartNegative = -Math.min(0, overallPart);
logger.info("bestTcf: " + bestTcf);
logger.info("bestBM25F: " + bestBM25F);
logger.info("bestBM25P: " + bestBM25P);
logger.info("bestBM25PN: " + bestBM25PN);
logger.info("overallPartPositive: " + overallPartPositive);
logger.info("overallPartNegative: " + overallPartNegative);
// Renormalize to 0...15, where 0 is the best possible score;
// this is a historical artifact of the original ranking function
return normalize(1.5 * bestTcf + bestBM25F + bestBM25P + 0.25 * bestBM25PN + overallPartPositive, overallPartNegative);
double ret = normalize(1.5 * bestTcf + bestBM25F + bestBM25P + 0.25 * bestBM25PN + overallPartPositive, overallPartNegative);
logger.info("ret: " + ret);
return ret;
}
private double calculateQualityPenalty(int size, int quality, ResultRankingParameters rankingParams) {

View File

@ -19,21 +19,16 @@ public class ProcessingIterator<T> implements Iterator<T> {
private final LinkedBlockingQueue<T> queue;
private final AtomicBoolean isFinished = new AtomicBoolean(false);
private final ExecutorService executorService;
private final Semaphore sem;
private final SimpleBlockingThreadPool pool;
private T next = null;
private final int parallelism;
ProcessingIterator(ExecutorService executorService, int queueSize, int parallelism, ProcessingJob<T> task) {
this.parallelism = parallelism;
@SneakyThrows
ProcessingIterator(SimpleBlockingThreadPool pool, int queueSize, ProcessingJob<T> task) {
queue = new LinkedBlockingQueue<>(queueSize);
this.executorService = executorService;
sem = new Semaphore(parallelism);
this.pool = pool;
executorService.submit(() -> executeJob(task));
pool.submit(() -> executeJob(task));
}
public static Factory factory(int queueSize, int parallelism) {
@ -50,20 +45,13 @@ public class ProcessingIterator<T> implements Iterator<T> {
}
}
@SneakyThrows
private void executeTask(Task<T> task) {
try {
sem.acquire();
} catch (InterruptedException e) {
return;
}
executorService.submit(() -> {
pool.submit(() -> {
try {
queue.put(task.get());
} catch (Exception e) {
logger.warn("Exception while processing", e);
} finally {
sem.release();
}
});
}
@ -97,7 +85,7 @@ public class ProcessingIterator<T> implements Iterator<T> {
private boolean expectMore() {
return !isFinished.get() // we are still reading from the database
|| !queue.isEmpty() // ... or we have documents in the queue
|| sem.availablePermits() < parallelism; // ... or we are still processing documents
|| pool.getActiveCount() > 0; // ... or we are still processing documents
}
/** Returns the next document to be processed.
@ -142,24 +130,17 @@ public class ProcessingIterator<T> implements Iterator<T> {
public static class Factory {
private final int queueSize;
private final int parallelism;
private final ExecutorService executorService;
private final SimpleBlockingThreadPool pool;
Factory(int queueSize, int parallelism) {
this.queueSize = queueSize;
this.parallelism = parallelism;
this.executorService = Executors.newFixedThreadPool(parallelism);
this.pool = new SimpleBlockingThreadPool("sideload", parallelism, 4);
}
public <T> ProcessingIterator<T> create(ProcessingJob<T> task) {
return new ProcessingIterator<>(executorService, queueSize, parallelism, task);
return new ProcessingIterator<>(pool, queueSize, task);
}
public void stop() {
if (!executorService.isShutdown()) {
executorService.shutdown();
}
}
}
}

View File

@ -86,7 +86,9 @@ public class DomainProcessor {
private final Set<String> processedUrls = new HashSet<>();
private final DomainLinks externalDomainLinks;
private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator();
private static ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(24, 16);
private static final ProcessingIterator.Factory iteratorFactory = ProcessingIterator.factory(8,
Integer.getInteger("java.util.concurrent.ForkJoinPool.common.parallelism", Runtime.getRuntime().availableProcessors())
);
SideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint) throws IOException {
this.dataStream = dataStream;