(converter) Hook crawl job extractor and adjacencies calculator into control service.

This commit is contained in:
Viktor Lofgren 2023-07-26 15:46:22 +02:00
parent 19c2ceec9b
commit a5d980ee56
12 changed files with 273 additions and 25 deletions

View File

@ -26,6 +26,14 @@ tasks.register('dist', Copy) {
from tarTree("$buildDir/dist/loader-process.tar")
into "$projectDir/run/dist/"
}
copy {
from tarTree("$buildDir/dist/website-adjacencies-calculator.tar")
into "$projectDir/run/dist/"
}
copy {
from tarTree("$buildDir/dist/crawl-job-extractor-process.tar")
into "$projectDir/run/dist/"
}
}
}
idea {

View File

@ -93,12 +93,12 @@ public class ControlService extends Service {
Spark.post("/public/fsms/:fsm/start", controlActorService::startFsm, redirectToProcesses);
Spark.post("/public/fsms/:fsm/stop", controlActorService::stopFsm, redirectToProcesses);
Spark.post("/public/storage/:fid/crawl", controlActorService::triggerCrawling, redirectToProcesses);
Spark.post("/public/storage/:fid/recrawl", controlActorService::triggerRecrawling, redirectToProcesses);
Spark.post("/public/storage/:fid/process", controlActorService::triggerProcessing, redirectToProcesses);
Spark.post("/public/storage/:fid/load", controlActorService::loadProcessedData, redirectToProcesses);
Spark.post("/public/storage/specs", controlActorService::createCrawlSpecification, redirectToStorage);
Spark.post("/public/storage/:fid/delete", controlFileStorageService::flagFileForDeletionRequest, redirectToStorage);
Spark.get("/public/:resource", this::serveStatic);

View File

@ -4,13 +4,11 @@ import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.SneakyThrows;
import nu.marginalia.control.actor.task.CrawlActor;
import nu.marginalia.control.actor.task.RecrawlActor;
import nu.marginalia.control.actor.task.*;
import nu.marginalia.control.model.Actor;
import nu.marginalia.control.actor.monitor.*;
import nu.marginalia.control.actor.monitor.ConverterMonitorActor;
import nu.marginalia.control.actor.monitor.LoaderMonitorActor;
import nu.marginalia.control.actor.task.ReconvertAndLoadActor;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mqsm.StateMachine;
@ -45,7 +43,9 @@ public class ControlActors {
LoaderMonitorActor loaderMonitor,
MessageQueueMonitorActor messageQueueMonitor,
ProcessLivenessMonitorActor processMonitorFSM,
FileStorageMonitorActor fileStorageMonitorActor
FileStorageMonitorActor fileStorageMonitorActor,
TriggerAdjacencyCalculationActor triggerAdjacencyCalculationActor,
CrawlJobExtractorActor crawlJobExtractorActor
) {
this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog;
@ -60,6 +60,8 @@ public class ControlActors {
register(Actor.MESSAGE_QUEUE_MONITOR, messageQueueMonitor);
register(Actor.PROCESS_LIVENESS_MONITOR, processMonitorFSM);
register(Actor.FILE_STORAGE_MONITOR, fileStorageMonitorActor);
register(Actor.ADJACENCY_CALCULATION, triggerAdjacencyCalculationActor);
register(Actor.CRAWL_JOB_EXTRACTOR, crawlJobExtractorActor);
}
private void register(Actor process, AbstractStateGraph graph) {

View File

@ -0,0 +1,135 @@
package nu.marginalia.control.actor.task;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ControlFileStorageService;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorage;
import nu.marginalia.db.storage.model.FileStorageBaseType;
import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@Singleton
public class CrawlJobExtractorActor extends AbstractStateGraph {
private final Logger logger = LoggerFactory.getLogger(getClass());
// STATES
public static final String INITIAL = "INITIAL";
public static final String CREATE_FROM_DB = "CREATE_FROM_DB";
public static final String CREATE_FROM_LINK = "CREATE_FROM_LINK";
public static final String END = "END";
private final ProcessService processService;
private final FileStorageService fileStorageService;
private final ControlFileStorageService controlFileStorageService;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
@Inject
public CrawlJobExtractorActor(StateFactory stateFactory,
ProcessService processService,
FileStorageService fileStorageService,
ControlFileStorageService controlFileStorageService
) {
super(stateFactory);
this.processService = processService;
this.fileStorageService = fileStorageService;
this.controlFileStorageService = controlFileStorageService;
}
public record CrawlJobExtractorArguments(String description) { }
public record CrawlJobExtractorArgumentsWithURL(String description, String url) { }
@GraphState(name = INITIAL, next = END)
public void initial() throws Exception { error("This state does nothing"); }
@GraphState(name = CREATE_FROM_LINK, next = END,
resume = ResumeBehavior.ERROR,
description = """
Download a list of URLs as provided,
and then spawn a CrawlJobExtractor process,
then wait for it to finish.
"""
)
public void createFromFromLink(CrawlJobExtractorArgumentsWithURL arg) throws Exception {
if (arg == null) {
error("This actor requires a CrawlJobExtractorArgumentsWithURL argument");
}
var base = fileStorageService.getStorageBase(FileStorageBaseType.SLOW);
var storage = fileStorageService.allocateTemporaryStorage(base, FileStorageType.CRAWL_SPEC, "crawl-spec", arg.description());
Path urlsTxt = storage.asPath().resolve("urls.txt");
try (var os = Files.newOutputStream(urlsTxt, StandardOpenOption.CREATE_NEW);
var is = new URL(arg.url()).openStream())
{
is.transferTo(os);
}
catch (Exception ex) {
controlFileStorageService.flagFileForDeletion(storage.id());
error("Error downloading " + arg.url());
}
final Path path = storage.asPath();
run(storage, path.resolve("crawler.spec").toString(),
"-f", urlsTxt.toString());
}
@GraphState(name = CREATE_FROM_DB, next = END,
resume = ResumeBehavior.ERROR,
description = """
Spawns a CrawlJobExtractor process that loads data from the link database, and wait for it to finish.
"""
)
public void createFromDB(CrawlJobExtractorArguments arg) throws Exception {
if (arg == null) {
error("This actor requires a CrawlJobExtractorArguments argument");
}
var base = fileStorageService.getStorageBase(FileStorageBaseType.SLOW);
var storage = fileStorageService.allocateTemporaryStorage(base, FileStorageType.CRAWL_SPEC, "crawl-spec", arg.description());
final Path path = storage.asPath();
run(storage,
path.resolve("crawler.spec").toString());
}
private void run(FileStorage storage, String... args) throws Exception {
AtomicBoolean hasError = new AtomicBoolean(false);
var future = executor.submit(() -> {
try {
processService.trigger(ProcessService.ProcessId.CRAWL_JOB_EXTRACTOR,
args);
}
catch (Exception ex) {
logger.warn("Error in creating crawl job", ex);
hasError.set(true);
}
});
future.get();
if (hasError.get()) {
controlFileStorageService.flagFileForDeletion(storage.id());
error("Error triggering adjacency calculation");
}
}
}

View File

@ -0,0 +1,59 @@
package nu.marginalia.control.actor.task;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@Singleton
public class TriggerAdjacencyCalculationActor extends AbstractStateGraph {
private final Logger logger = LoggerFactory.getLogger(getClass());
// STATES
private static final String INITIAL = "INITIAL";
private static final String END = "END";
private final ProcessService processService;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
@Inject
public TriggerAdjacencyCalculationActor(StateFactory stateFactory,
ProcessService processService) {
super(stateFactory);
this.processService = processService;
}
@GraphState(name = INITIAL, next = END,
resume = ResumeBehavior.ERROR,
description = """
Spawns a WebsitesAdjacenciesCalculator process and waits for it to finish.
"""
)
public void init() throws Exception {
AtomicBoolean hasError = new AtomicBoolean(false);
var future = executor.submit(() -> {
try {
processService.trigger(ProcessService.ProcessId.ADJACENCIES_CALCULATOR, "load");
}
catch (Exception ex) {
logger.warn("Error triggering adjacency calculation", ex);
hasError.set(true);
}
});
future.get();
if (hasError.get()) {
error("Error triggering adjacency calculation");
}
}
}

View File

@ -9,7 +9,9 @@ public enum Actor {
CRAWLER_MONITOR,
MESSAGE_QUEUE_MONITOR,
PROCESS_LIVENESS_MONITOR,
FILE_STORAGE_MONITOR
FILE_STORAGE_MONITOR,
ADJACENCY_CALCULATION,
CRAWL_JOB_EXTRACTOR
;

View File

@ -42,6 +42,8 @@ public record ProcessHeartbeat(
case "converter" -> ProcessService.ProcessId.CONVERTER;
case "crawler" -> ProcessService.ProcessId.CRAWLER;
case "loader" -> ProcessService.ProcessId.LOADER;
case "website-adjacencies-calculator" -> ProcessService.ProcessId.ADJACENCIES_CALCULATOR;
case "crawl-job-extractor" -> ProcessService.ProcessId.CRAWL_JOB_EXTRACTOR;
default -> throw new RuntimeException("Unknown process base: " + processBase);
};
}

View File

@ -3,6 +3,7 @@ package nu.marginalia.control.svc;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.actor.ControlActors;
import nu.marginalia.control.actor.task.CrawlJobExtractorActor;
import nu.marginalia.control.actor.task.ReconvertAndLoadActor;
import nu.marginalia.control.actor.task.RecrawlActor;
import nu.marginalia.control.model.Actor;
@ -94,4 +95,27 @@ public class ControlActorService {
}).toList();
}
public Object createCrawlSpecification(Request request, Response response) throws Exception {
final String description = request.queryParams("description");
final String url = request.queryParams("url");
final String source = request.queryParams("source");
if ("db".equals(source)) {
controlActors.startFrom(Actor.CRAWL_JOB_EXTRACTOR,
CrawlJobExtractorActor.CREATE_FROM_DB,
new CrawlJobExtractorActor.CrawlJobExtractorArguments(description)
);
}
else if ("download".equals(source)) {
controlActors.startFrom(Actor.CRAWL_JOB_EXTRACTOR,
CrawlJobExtractorActor.CREATE_FROM_LINK,
new CrawlJobExtractorActor.CrawlJobExtractorArgumentsWithURL(description, url)
);
}
else {
throw new IllegalArgumentException("Unknown source: " + source);
}
return "";
}
}

View File

@ -7,7 +7,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import spark.utils.IOUtils;
import javax.inject.Inject;
import javax.inject.Singleton;
@ -33,7 +32,11 @@ public class ProcessService {
public enum ProcessId {
CRAWLER("crawler-process/bin/crawler-process"),
CONVERTER("converter-process/bin/converter-process"),
LOADER("loader-process/bin/loader-process");
LOADER("loader-process/bin/loader-process"),
ADJACENCIES_CALCULATOR("website-adjacencies-calculator/bin/website-adjacencies-calculator"),
CRAWL_JOB_EXTRACTOR("crawl-job-extractor-process/bin/crawl-job-extractor-process"),
;
public final String path;
ProcessId(String path) {
@ -49,10 +52,17 @@ public class ProcessService {
}
public boolean trigger(ProcessId processId) throws Exception {
return trigger(processId, new String[0]);
}
public boolean trigger(ProcessId processId, String... parameters) throws Exception {
String processPath = processPath(processId);
String[] args = new String[] {
processPath
};
String[] args = new String[parameters.length + 1];
args[0] = processPath;
for (int i = 0; i < parameters.length; i++)
args[i+1] = parameters[i];
String[] env = env();
Process process;

View File

@ -28,6 +28,8 @@ public class CrawlJobExtractorMain {
return;
}
// TODO (2023-06-26) figure out whether this needs a ProcessHeartbeat
String[] targetDomains = getTargetDomains(Arrays.copyOfRange(args, 1, args.length));
try (CrawlJobSpecWriter out = new CrawlJobSpecWriter(outFile))

View File

@ -19,6 +19,7 @@ java {
dependencies {
implementation project(':code:common:model')
implementation project(':code:common:db')
implementation project(':code:common:process')
implementation project(':code:common:service')
implementation libs.lombok

View File

@ -2,9 +2,11 @@ package nu.marginalia.adjacencies;
import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.id.EdgeId;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.service.module.DatabaseModule;
import java.sql.SQLException;
@ -58,30 +60,22 @@ public class WebsiteAdjacenciesCalculator {
}
@SneakyThrows
public void loadAll() {
public void loadAll(ProcessHeartbeat processHeartbeat) {
AdjacenciesLoader loader = new AdjacenciesLoader(dataSource);
var executor = Executors.newFixedThreadPool(16);
var ids = adjacenciesData.getIdsList();
ProgressPrinter progressPrinter = new ProgressPrinter(ids.size());
progressPrinter.start();
int total = adjacenciesData.getIdsList().size();
AtomicInteger progress = new AtomicInteger(0);
IntStream.of(adjacenciesData.getIdsList().toArray()).parallel()
.filter(domainAliases::isNotAliased)
.forEach(id -> {
findAdjacent(id, loader::load);
progressPrinter.advance();
processHeartbeat.setProgress(progress.incrementAndGet() / (double) total);
});
progressPrinter.stop();
executor.shutdown();
System.out.println("Waiting for wrap-up");
loader.stop();
}
private static class ProgressPrinter {
@ -192,10 +186,19 @@ public class WebsiteAdjacenciesCalculator {
public static void main(String[] args) throws SQLException {
DatabaseModule dm = new DatabaseModule();
var main = new WebsiteAdjacenciesCalculator(dm.provideConnection());
var dataSource = dm.provideConnection();
var main = new WebsiteAdjacenciesCalculator(dataSource);
if (args.length == 1 && "load".equals(args[0])) {
main.loadAll();
var processHeartbeat = new ProcessHeartbeat(
new ProcessConfiguration("website-adjacencies-calculator", 0, UUID.randomUUID()),
dataSource
);
processHeartbeat.start();
main.loadAll(processHeartbeat);
processHeartbeat.shutDown();
return;
}