(blocking-thread-pool) Move DumbThreadPool to its own micro-library

Also rename it to SimpleBlockingThreadPool.
This commit is contained in:
Viktor Lofgren 2023-09-20 10:11:49 +02:00
parent f6b9e8c5eb
commit d895f83520
10 changed files with 123 additions and 142 deletions

View File

@ -0,0 +1,27 @@
plugins {
id 'java'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(20))
}
}
dependencies {
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.bundles.slf4j
implementation libs.notnull
implementation libs.fastutil
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
test {
useJUnitPlatform()
}

View File

@ -1,40 +1,37 @@
package nu.marginalia.crawl;
package nu.marginalia.util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/** A simple thread pool implementation that will never invoke
* a task in the calling thread like {@link java.util.concurrent.ThreadPoolExecutor}
* does when the queue is full. Instead, it will block until a thread
* becomes available to run the task. This is useful for coarse grained
* tasks where the calling thread might otherwise block for hours.
/** A dead simple thread pool implementation that will block the caller
* when it is not able to perform a task. This is desirable in batch
* processing workloads.
*/
// TODO: This class exists in converter as well, should probably be broken out into a common library; use this version
public class DumbThreadPool {
public class SimpleBlockingThreadPool {
private final List<Thread> workers = new ArrayList<>();
private final LinkedBlockingQueue<Task> tasks;
private final BlockingQueue<Task> tasks;
private volatile boolean shutDown = false;
private final AtomicInteger taskCount = new AtomicInteger(0);
private final Logger logger = LoggerFactory.getLogger(DumbThreadPool.class);
private final Logger logger = LoggerFactory.getLogger(SimpleBlockingThreadPool.class);
public DumbThreadPool(int poolSize, int queueSize) {
tasks = new LinkedBlockingQueue<>(queueSize);
public SimpleBlockingThreadPool(String name, int poolSize, int queueSize) {
tasks = new ArrayBlockingQueue<>(queueSize);
for (int i = 0; i < poolSize; i++) {
Thread worker = new Thread(this::worker, "Crawler Thread " + i);
Thread worker = new Thread(this::worker, name + "[" + i + "]");
worker.setDaemon(true);
worker.start();
workers.add(worker);
}
}
public void submit(Task task) throws InterruptedException {
tasks.put(task);
}
@ -126,4 +123,5 @@ public class DumbThreadPool {
public interface Task {
void run() throws Exception;
}
}

View File

@ -35,6 +35,7 @@ dependencies {
implementation project(':code:common:service')
implementation project(':code:common:config')
implementation project(':code:libraries:message-queue')
implementation project(':code:libraries:blocking-thread-pool')
implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client')

View File

@ -14,11 +14,10 @@ import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.mqapi.converting.ConvertAction;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.util.SimpleBlockingThreadPool;
import nu.marginalia.worklog.BatchingWorkLog;
import nu.marginalia.worklog.BatchingWorkLogImpl;
import plan.CrawlPlan;
@ -26,7 +25,6 @@ import nu.marginalia.converting.processor.DomainProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Collection;
@ -107,7 +105,7 @@ public class ConverterMain {
try (BatchingWorkLog batchingWorkLog = new BatchingWorkLogImpl(plan.process.getLogFile());
ConverterWriter converterWriter = new ConverterWriter(batchingWorkLog, plan.process.getDir()))
{
var pool = new DumbThreadPool(maxPoolSize, 2);
var pool = new SimpleBlockingThreadPool("ConverterThread", maxPoolSize, 2);
int totalDomains = plan.countCrawledDomains();
AtomicInteger processedDomains = new AtomicInteger(0);

View File

@ -1,119 +0,0 @@
package nu.marginalia.converting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/** A simple thread pool implementation that will never invoke
* a task in the calling thread like {@link java.util.concurrent.ThreadPoolExecutor}
* does when the queue is full. Instead, it will block until a thread
* becomes available to run the task. This is useful for coarse grained
* tasks where the calling thread might otherwise block for hours.
*/
// TODO: This class exists in crawler as well, should probably be broken out into a common library; use the one from crawler instead
public class DumbThreadPool {
private final List<Thread> workers = new ArrayList<>();
private final LinkedBlockingQueue<Runnable> tasks;
private volatile boolean shutDown = false;
private final AtomicInteger taskCount = new AtomicInteger(0);
private final Logger logger = LoggerFactory.getLogger(DumbThreadPool.class);
public DumbThreadPool(int poolSize, int queueSize) {
tasks = new LinkedBlockingQueue<>(queueSize);
for (int i = 0; i < poolSize; i++) {
Thread worker = new Thread(this::worker, "Converter Thread " + i);
worker.setDaemon(true);
worker.start();
workers.add(worker);
}
}
public void submit(Runnable runnable) throws InterruptedException {
tasks.put(runnable);
}
public void shutDown() {
this.shutDown = true;
}
public void shutDownNow() {
this.shutDown = true;
for (Thread worker : workers) {
worker.interrupt();
}
}
private void worker() {
while (!shutDown) {
try {
Runnable task = tasks.poll(1, TimeUnit.SECONDS);
if (task == null) {
continue;
}
try {
taskCount.incrementAndGet();
task.run();
}
catch (Exception ex) {
logger.warn("Error executing task", ex);
}
finally {
taskCount.decrementAndGet();
}
}
catch (InterruptedException ex) {
logger.warn("Thread pool worker interrupted", ex);
return;
}
}
}
/** Wait for all tasks to complete up to the specified timeout,
* then return true if all tasks completed, false otherwise.
*/
public boolean awaitTermination(int i, TimeUnit timeUnit) throws InterruptedException {
final long start = System.currentTimeMillis();
final long deadline = start + timeUnit.toMillis(i);
for (var thread : workers) {
if (!thread.isAlive())
continue;
long timeRemaining = deadline - System.currentTimeMillis();
if (timeRemaining <= 0)
return false;
thread.join(timeRemaining);
if (thread.isAlive())
return false;
}
// Doublecheck the bookkeeping so we didn't mess up. This may mean you have to Ctrl+C the process
// if you see this warning forever, but for the crawler this is preferable to terminating early
// and missing tasks. (maybe some cosmic ray or OOM condition or X-Files baddie of the week killed a
// thread so hard and it didn't invoke finally and didn't decrement the task count)
int activeCount = getActiveCount();
if (activeCount != 0) {
logger.warn("Thread pool terminated with {} active threads(?!) -- check what's going on with jstack and kill manually", activeCount);
return false;
}
return true;
}
public int getActiveCount() {
return taskCount.get();
}
}

View File

@ -17,6 +17,7 @@ public class HtmlProcessorSpecializations {
private final XenForoSpecialization xenforoSpecialization;
private final PhpBBSpecialization phpBBSpecialization;
private final JavadocSpecialization javadocSpecialization;
private final MariadbKbSpecialization mariadbKbSpecialization;
private final BlogSpecialization blogSpecialization;
private final DefaultSpecialization defaultSpecialization;
@ -26,13 +27,14 @@ public class HtmlProcessorSpecializations {
XenForoSpecialization xenforoSpecialization,
PhpBBSpecialization phpBBSpecialization,
JavadocSpecialization javadocSpecialization,
BlogSpecialization blogSpecialization,
MariadbKbSpecialization mariadbKbSpecialization, BlogSpecialization blogSpecialization,
DefaultSpecialization defaultSpecialization) {
this.domainTypes = domainTypes;
this.lemmySpecialization = lemmySpecialization;
this.xenforoSpecialization = xenforoSpecialization;
this.phpBBSpecialization = phpBBSpecialization;
this.javadocSpecialization = javadocSpecialization;
this.mariadbKbSpecialization = mariadbKbSpecialization;
this.blogSpecialization = blogSpecialization;
this.defaultSpecialization = defaultSpecialization;
}
@ -47,6 +49,11 @@ public class HtmlProcessorSpecializations {
return blogSpecialization;
}
if (url.domain.getDomain().equals("mariadb.com")
&& url.path.startsWith("/kb")) {
return mariadbKbSpecialization;
}
if (generator.keywords().contains("lemmy")) {
return lemmySpecialization;
}

View File

@ -0,0 +1,66 @@
package nu.marginalia.converting.processor.plugin.specialization;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.keyword.model.DocumentKeywordsBuilder;
import nu.marginalia.model.idx.WordFlags;
import nu.marginalia.summary.SummaryExtractor;
import org.apache.commons.lang3.StringUtils;
import org.jsoup.nodes.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
@Singleton
public class MariadbKbSpecialization extends DefaultSpecialization {
private static final Logger logger = LoggerFactory.getLogger(MariadbKbSpecialization.class);
@Inject
public MariadbKbSpecialization(SummaryExtractor summaryExtractor) {
super(summaryExtractor);
}
@Override
public Document prune(Document doc) {
var newDoc = new Document(doc.baseUri());
var bodyTag = newDoc.appendElement("body");
var comments = doc.getElementById("comments");
if (comments != null)
comments.remove();
var contentTag= doc.getElementById("content");
if (contentTag != null)
bodyTag.appendChild(newDoc.createElement("section").html(contentTag.html()));
return newDoc;
}
@Override
public void amendWords(Document doc, DocumentKeywordsBuilder words) {
Set<String> toAdd = new HashSet<>();
for (var elem : doc.getElementsByTag("strong")) {
var text = elem.text();
if (text.contains(":"))
continue;
if (text.contains("("))
continue;
String[] keywords = text.toLowerCase().split("\\s+");
if (keywords.length > 4)
continue;
toAdd.addAll(List.of(keywords));
for (int i = 1; i < keywords.length; i++) {
toAdd.add(keywords[i-1] + "_" + keywords[i]);
}
}
System.out.println("Generated keywords: " + toAdd);
words.setFlagOnMetadataForWords(WordFlags.Subjects, toAdd);
}
}

View File

@ -26,6 +26,7 @@ dependencies {
implementation project(':code:common:config')
implementation project(':code:common:service')
implementation project(':code:libraries:big-string')
implementation project(':code:libraries:blocking-thread-pool')
implementation project(':code:api:index-api')
implementation project(':code:api:process-mqapi')
implementation project(':code:common:service-discovery')

View File

@ -23,6 +23,7 @@ import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.crawling.io.CrawledDomainWriter;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.util.SimpleBlockingThreadPool;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.internal.Util;
@ -53,7 +54,7 @@ public class CrawlerMain {
private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService;
private final Gson gson;
private final DumbThreadPool pool;
private final SimpleBlockingThreadPool pool;
private final Map<String, String> processingIds = new ConcurrentHashMap<>();
private final CrawledDomainReader reader = new CrawledDomainReader();
@ -77,7 +78,7 @@ public class CrawlerMain {
this.gson = gson;
// maybe need to set -Xss for JVM to deal with this?
pool = new DumbThreadPool(CrawlLimiter.maxPoolSize, 1);
pool = new SimpleBlockingThreadPool("CrawlerPool", CrawlLimiter.maxPoolSize, 1);
}
public static void main(String... args) throws Exception {
@ -150,7 +151,7 @@ public class CrawlerMain {
}
}
class CrawlTask implements DumbThreadPool.Task {
class CrawlTask implements SimpleBlockingThreadPool.Task {
private final CrawlSpecRecord specification;

View File

@ -16,6 +16,7 @@ include 'code:libraries:guarded-regex'
include 'code:libraries:big-string'
include 'code:libraries:random-write-funnel'
include 'code:libraries:next-prime'
include 'code:libraries:blocking-thread-pool'
include 'code:libraries:braille-block-punch-cards'
include 'code:libraries:language-processing'
include 'code:libraries:term-frequency-dict'