(*) WIP Add node affinity to EC_DOMAIN

Very messy commit due to fractalline yak shaving
This commit is contained in:
Viktor Lofgren 2023-10-19 13:22:52 +02:00
parent 2bf0c4497d
commit 81dd3809e9
45 changed files with 1392 additions and 619 deletions

View File

@ -4,18 +4,21 @@ import com.google.inject.Inject;
import nu.marginalia.client.AbstractDynamicClient;
import nu.marginalia.client.Context;
import nu.marginalia.executor.model.ActorRunStates;
import nu.marginalia.executor.model.crawl.RecrawlParameters;
import nu.marginalia.executor.model.load.LoadParameters;
import nu.marginalia.executor.model.transfer.TransferItem;
import nu.marginalia.executor.model.transfer.TransferSpec;
import nu.marginalia.executor.storage.FileStorageContent;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.storage.model.FileStorageId;
import java.io.OutputStream;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class ExecutorClient extends AbstractDynamicClient {
@Inject
@ -35,8 +38,8 @@ public class ExecutorClient extends AbstractDynamicClient {
post(ctx, node, "/process/crawl/" + fid, "").blockingSubscribe();
}
public void triggerRecrawl(Context ctx, int node, RecrawlParameters parameters) {
post(ctx, node, "/process/recrawl", parameters).blockingSubscribe();
public void triggerRecrawl(Context ctx, int node, FileStorageId fid) {
post(ctx, node, "/process/recrawl", fid).blockingSubscribe();
}
public void triggerConvert(Context ctx, int node, FileStorageId fid) {
@ -88,11 +91,6 @@ public class ExecutorClient extends AbstractDynamicClient {
"").blockingSubscribe();
}
public void createCrawlSpecFromDb(Context context, int node, String description) {
post(context, node, "/process/crawl-spec/from-db?description="+URLEncoder.encode(description, StandardCharsets.UTF_8), "")
.blockingSubscribe();
}
public void createCrawlSpecFromDownload(Context context, int node, String description, String url) {
post(context, node, "/process/crawl-spec/from-download?description="+URLEncoder.encode(description, StandardCharsets.UTF_8)+"&url="+URLEncoder.encode(url, StandardCharsets.UTF_8), "")
.blockingSubscribe();
@ -110,4 +108,21 @@ public class ExecutorClient extends AbstractDynamicClient {
return get(context, node, "/storage/"+fileId.id(), FileStorageContent.class).blockingFirst();
}
public void transferFile(Context context, int node, FileStorageId fileId, String path, OutputStream destOutputStream) {
String endpoint = "/transfer/file/%d?path=%s".formatted(fileId.id(), URLEncoder.encode(path, StandardCharsets.UTF_8));
get(context, node, endpoint,
destOutputStream)
.blockingSubscribe();
}
public TransferSpec getTransferSpec(Context context, int node, int count) {
return get(context, node, "/transfer/spec?count="+count, TransferSpec.class)
.timeout(30, TimeUnit.MINUTES)
.blockingFirst();
}
public void yieldDomain(Context context, int node, TransferItem item) {
post(context, node, "/transfer/yield", item).blockingSubscribe();
}
}

View File

@ -1,11 +0,0 @@
package nu.marginalia.executor.model.crawl;
import nu.marginalia.storage.model.FileStorageId;
import java.util.List;
public record RecrawlParameters(
FileStorageId crawlDataId,
List<FileStorageId> crawlSpecIds
) {
}

View File

@ -0,0 +1,9 @@
package nu.marginalia.executor.model.transfer;
import nu.marginalia.storage.model.FileStorageId;
public record TransferItem(String domainName,
int domainId,
FileStorageId fileStorageId,
String path) {
}

View File

@ -0,0 +1,13 @@
package nu.marginalia.executor.model.transfer;
import java.util.List;
public record TransferSpec(List<TransferItem> items) {
public TransferSpec() {
this(List.of());
}
public int size() {
return items.size();
}
}

View File

@ -8,6 +8,14 @@ import java.util.List;
/** A request to start a crawl */
@AllArgsConstructor
public class CrawlRequest {
/** (optional) Crawl spec(s) for sourcing domains to crawl. If not set,
* the EC_DOMAIN table will be consulted and domains with the corresponding
* node affinity will be used.
*/
public List<FileStorageId> specStorage;
/** File storage where the crawl data will be written. If it contains existing crawl data,
* this crawl data will be referenced for e-tags and last-mofified checks.
*/
public FileStorageId crawlStorage;
}

View File

@ -488,6 +488,10 @@ public class FileStorageService {
}
}
public List<FileStorageId> getActiveFileStorages(FileStorageType type) throws SQLException {
return getActiveFileStorages(node, type);
}
public List<FileStorageId> getActiveFileStorages(int nodeId, FileStorageType type) throws SQLException
{

View File

@ -14,6 +14,7 @@ import java.util.OptionalInt;
*/
public class DbDomainStatsExportMultitool implements AutoCloseable {
private final Connection connection;
private final int nodeId;
private final PreparedStatement knownUrlsQuery;
private final PreparedStatement visitedUrlsQuery;
private final PreparedStatement goodUrlsQuery;
@ -23,8 +24,9 @@ public class DbDomainStatsExportMultitool implements AutoCloseable {
private final PreparedStatement crawlQueueDomains;
private final PreparedStatement indexedDomainsQuery;
public DbDomainStatsExportMultitool(HikariDataSource dataSource) throws SQLException {
public DbDomainStatsExportMultitool(HikariDataSource dataSource, int nodeId) throws SQLException {
this.connection = dataSource.getConnection();
this.nodeId = nodeId;
knownUrlsQuery = connection.prepareStatement("""
SELECT KNOWN_URLS
@ -64,21 +66,14 @@ public class DbDomainStatsExportMultitool implements AutoCloseable {
""");
}
public OptionalInt getKnownUrls(String domainName) throws SQLException {
return executeNameToIntQuery(domainName, knownUrlsQuery);
}
public OptionalInt getVisitedUrls(String domainName) throws SQLException {
return executeNameToIntQuery(domainName, visitedUrlsQuery);
}
public OptionalInt getGoodUrls(String domainName) throws SQLException {
return executeNameToIntQuery(domainName, goodUrlsQuery);
}
public OptionalInt getDomainId(String domainName) throws SQLException {
return executeNameToIntQuery(domainName, domainNameToId);
}
public List<String> getAllDomains() throws SQLException {
return executeListQuery(allDomainsQuery, 100_000);
}
public List<String> getCrawlQueueDomains() throws SQLException {
return executeListQuery(crawlQueueDomains, 100);
}

View File

@ -0,0 +1 @@
ALTER TABLE EC_DOMAIN ADD COLUMN NODE_AFFINITY INT NOT NULL;

View File

@ -20,4 +20,18 @@ public record WorkLogEntry(String id, String ts, String path, int cnt) {
return splitPattern.split(s, 2)[0];
}
public String fileName() {
if (path.contains("/")) {
return path.substring(path.lastIndexOf("/") + 1);
}
return path;
}
public String relPath() {
// Compatibility trick!
String relPath = fileName();
return relPath.substring(0, 2) + "/" + relPath.substring(2, 4) + "/" + relPath;
}
}

View File

@ -17,7 +17,9 @@ import okhttp3.*;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.utils.IOUtils;
import java.io.OutputStream;
import java.net.ConnectException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
@ -231,6 +233,23 @@ public abstract class AbstractClient implements AutoCloseable {
.doFinally(() -> ThreadContext.remove("outbound-request"));
}
protected synchronized Observable<Integer> get(Context ctx, int node, String endpoint, OutputStream outputStream) {
ensureAlive(node);
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).get().build();
return Observable.just(client.newCall(req))
.subscribeOn(scheduler().get())
.map(this::logInbound)
.map(Call::execute)
.map(this::logOutbound)
.map(rsp -> validateResponseStatus(rsp, req, 200))
.map(rsp -> copyToOutputStream(rsp, outputStream))
.retryWhen(this::retryHandler)
.timeout(timeout, TimeUnit.SECONDS)
.doFinally(() -> ThreadContext.remove("outbound-request"));
}
@SuppressWarnings("unchecked")
protected synchronized Observable<String> get(Context ctx, int node, String endpoint) {
ensureAlive(node);
@ -352,6 +371,13 @@ public abstract class AbstractClient implements AutoCloseable {
}
@SneakyThrows
private Integer copyToOutputStream(Response response, OutputStream outputStream) {
try (response) {
return IOUtils.copy(response.body().byteStream(), outputStream);
}
}
@SneakyThrows
private <T> T getEntity(Response response, Class<T> clazz) {
try (response) {

View File

@ -56,7 +56,7 @@ public class SimpleBlockingThreadPool {
}
private void worker() {
while (!tasks.isEmpty() || !shutDown) {
while (!(tasks.isEmpty() && shutDown)) {
try {
Task task = tasks.poll(1, TimeUnit.SECONDS);
if (task == null) {
@ -90,13 +90,6 @@ public class SimpleBlockingThreadPool {
final long start = System.currentTimeMillis();
final long deadline = start + timeUnit.toMillis(i);
// Drain the queue
while (!tasks.isEmpty()) {
long timeRemaining = deadline - System.currentTimeMillis();
if (timeRemaining <= 0)
return false;
}
// Wait for termination
for (var thread : workers) {
if (!thread.isAlive())

View File

@ -2,7 +2,6 @@ package nu.marginalia.crawlspec;
import nu.marginalia.db.DbDomainStatsExportMultitool;
import nu.marginalia.io.crawlspec.CrawlSpecRecordParquetFileWriter;
import nu.marginalia.linkdb.LinkdbReader;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import java.io.IOException;
@ -126,18 +125,5 @@ public class CrawlSpecGenerator {
"https://" + domainName + "/"
);
}
static KnownUrlsListSource fromLinkdb(DbDomainStatsExportMultitool dbData,
LinkdbReader linkdbReader)
{
return domainName -> {
var maybeId = dbData.getDomainId(domainName);
if (maybeId.isEmpty())
return List.of();
return linkdbReader
.getUrlsFromDomain(maybeId.getAsInt());
};
}
}
}

View File

@ -123,6 +123,7 @@ public class ConverterMain {
int totalDomains = plan.countCrawledDomains();
AtomicInteger processedDomains = new AtomicInteger(0);
logger.info("Processing {} domains", totalDomains);
// Advance the progress bar to the current position if this is a resumption
processedDomains.set(batchingWorkLog.size());
@ -138,6 +139,9 @@ public class ConverterMain {
});
}
// Grace period in case we're loading like 1 item
Thread.sleep(100);
pool.shutDown();
do {
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");

View File

@ -10,10 +10,12 @@ import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.spec.CrawlSpecProvider;
import nu.marginalia.crawl.spec.DbCrawlSpecProvider;
import nu.marginalia.crawl.spec.ParquetCrawlSpecProvider;
import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawlspec.CrawlSpecFileNames;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.io.crawlspec.CrawlSpecRecordParquetFileReader;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
@ -53,6 +55,7 @@ public class CrawlerMain {
private final UserAgent userAgent;
private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService;
private final DbCrawlSpecProvider dbCrawlSpecProvider;
private final Gson gson;
private final int node;
private final SimpleBlockingThreadPool pool;
@ -72,11 +75,13 @@ public class CrawlerMain {
MessageQueueFactory messageQueueFactory,
FileStorageService fileStorageService,
ProcessConfiguration processConfiguration,
DbCrawlSpecProvider dbCrawlSpecProvider,
Gson gson) {
this.heartbeat = heartbeat;
this.userAgent = userAgent;
this.messageQueueFactory = messageQueueFactory;
this.fileStorageService = fileStorageService;
this.dbCrawlSpecProvider = dbCrawlSpecProvider;
this.gson = gson;
this.node = processConfiguration.node();
@ -109,7 +114,7 @@ public class CrawlerMain {
var instructions = crawler.fetchInstructions();
try {
crawler.run(instructions.crawlSpec, instructions.outputDir);
crawler.run(instructions.specProvider, instructions.outputDir);
instructions.ok();
}
catch (Exception ex) {
@ -123,30 +128,24 @@ public class CrawlerMain {
System.exit(0);
}
public void run(List<Path> crawlSpec, Path outputDir) throws InterruptedException, IOException {
public void run(CrawlSpecProvider specProvider, Path outputDir) throws InterruptedException, IOException {
heartbeat.start();
try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"))) {
// First a validation run to ensure the file is all good to parse
logger.info("Validating JSON");
int taskCount = 0;
for (var specs : crawlSpec) {
taskCount += CrawlSpecRecordParquetFileReader.count(specs);
}
totalTasks = taskCount;
totalTasks = specProvider.totalCount();
logger.info("Queued {} crawl tasks, let's go", taskCount);
logger.info("Queued {} crawl tasks, let's go", totalTasks);
for (var specs : crawlSpec) {
try (var specStream = CrawlSpecRecordParquetFileReader.stream(specs)) {
specStream
.takeWhile((e) -> abortMonitor.isAlive())
.filter(e -> !workLog.isJobFinished(e.domain))
.filter(e -> processingIds.put(e.domain, "") == null)
.map(e -> new CrawlTask(e, outputDir, workLog))
.forEach(pool::submitQuietly);
}
try (var specStream = specProvider.stream()) {
specStream
.takeWhile((e) -> abortMonitor.isAlive())
.filter(e -> !workLog.isJobFinished(e.domain))
.filter(e -> processingIds.put(e.domain, "") == null)
.map(e -> new CrawlTask(e, outputDir, workLog))
.forEach(pool::submitQuietly);
}
logger.info("Shutting down the pool, waiting for tasks to complete...");
@ -155,6 +154,9 @@ public class CrawlerMain {
do {
System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining");
} while (!pool.awaitTermination(60, TimeUnit.SECONDS));
}
catch (Exception ex) {
}
finally {
heartbeat.shutDown();
@ -227,15 +229,15 @@ public class CrawlerMain {
private static class CrawlRequest {
private final List<Path> crawlSpec;
private final CrawlSpecProvider specProvider;
private final Path outputDir;
private final MqMessage message;
private final MqSingleShotInbox inbox;
CrawlRequest(List<Path> crawlSpec, Path outputDir, MqMessage message, MqSingleShotInbox inbox) {
CrawlRequest(CrawlSpecProvider specProvider, Path outputDir, MqMessage message, MqSingleShotInbox inbox) {
this.message = message;
this.inbox = inbox;
this.crawlSpec = crawlSpec;
this.specProvider = specProvider;
this.outputDir = outputDir;
}
@ -259,11 +261,20 @@ public class CrawlerMain {
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.crawling.CrawlRequest.class);
var specData = fileStorageService.getStorage(request.specStorage);
CrawlSpecProvider specProvider;
if (request.specStorage != null) {
var specData = fileStorageService.getStorage(request.specStorage);
specProvider = new ParquetCrawlSpecProvider(CrawlSpecFileNames.resolve(specData));
}
else {
specProvider = dbCrawlSpecProvider;
}
var crawlData = fileStorageService.getStorage(request.crawlStorage);
return new CrawlRequest(
CrawlSpecFileNames.resolve(specData),
specProvider,
crawlData.asPath(),
msg,
inbox);

View File

@ -0,0 +1,10 @@
package nu.marginalia.crawl.spec;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import java.util.stream.Stream;
public interface CrawlSpecProvider {
int totalCount() throws Exception;
Stream<CrawlSpecRecord> stream();
}

View File

@ -0,0 +1,76 @@
package nu.marginalia.crawl.spec;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Stream;
public class DbCrawlSpecProvider implements CrawlSpecProvider {
private final HikariDataSource dataSource;
private final ProcessConfiguration processConfiguration;
private List<CrawlSpecRecord> domains;
private static final Logger logger = LoggerFactory.getLogger(DbCrawlSpecProvider.class);
@Inject
public DbCrawlSpecProvider(HikariDataSource dataSource,
ProcessConfiguration processConfiguration
) {
this.dataSource = dataSource;
this.processConfiguration = processConfiguration;
}
// Load the domains into memory to ensure the crawler is resilient to database blips
private List<CrawlSpecRecord> loadData() throws SQLException {
var domains = new ArrayList<CrawlSpecRecord>();
logger.info("Loading domains to be crawled");
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT DOMAIN_NAME, COALESCE(GOOD_URLS, 0)
FROM EC_DOMAIN
LEFT JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
WHERE NODE_AFFINITY=?
"""))
{
query.setInt(1, processConfiguration.node());
query.setFetchSize(10_000);
var rs = query.executeQuery();
while (rs.next()) {
domains.add(new CrawlSpecRecord(
rs.getString(1),
Math.clamp((int) (1.25 * rs.getInt(2)), 200, 10_000),
List.of()
));
}
}
logger.info("Loaded {} domains", domains.size());
return domains;
}
@Override
public int totalCount() throws SQLException {
if (domains == null) {
domains = loadData();
}
return domains.size();
}
@Override
public Stream<CrawlSpecRecord> stream() {
return domains.stream();
}
}

View File

@ -0,0 +1,37 @@
package nu.marginalia.crawl.spec;
import lombok.SneakyThrows;
import nu.marginalia.io.crawlspec.CrawlSpecRecordParquetFileReader;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Stream;
public class ParquetCrawlSpecProvider implements CrawlSpecProvider {
private final List<Path> files;
public ParquetCrawlSpecProvider(List<Path> files) {
this.files = files;
}
@Override
public int totalCount() throws IOException {
int total = 0;
for (var specs : files) {
total += CrawlSpecRecordParquetFileReader.count(specs);
}
return total;
}
@Override
public Stream<CrawlSpecRecord> stream() {
return files.stream().flatMap(this::streamQuietly);
}
@SneakyThrows
private Stream<CrawlSpecRecord> streamQuietly(Path file) {
return CrawlSpecRecordParquetFileReader.stream(file);
}
}

View File

@ -3,16 +3,17 @@ package nu.marginalia.loading.domains;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.io.processed.DomainLinkRecordParquetFileReader;
import nu.marginalia.io.processed.DomainRecordParquetFileReader;
import nu.marginalia.io.processed.ProcessedDataFileNames;
import nu.marginalia.loading.LoaderInputData;
import nu.marginalia.model.EdgeDomain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.*;
@ -21,10 +22,14 @@ public class DomainLoaderService {
private final HikariDataSource dataSource;
private final Logger logger = LoggerFactory.getLogger(DomainLoaderService.class);
private final int nodeId;
@Inject
public DomainLoaderService(HikariDataSource dataSource) {
public DomainLoaderService(HikariDataSource dataSource,
ProcessConfiguration processConfiguration
) {
this.dataSource = dataSource;
this.nodeId = processConfiguration.node();
}
/** Read the domain names from each parquet file
@ -34,51 +39,32 @@ public class DomainLoaderService {
public DomainIdRegistry getOrCreateDomainIds(LoaderInputData inputData)
throws IOException, SQLException
{
Collection<String> domainNamesAll = readDomainNames(inputData);
return getDatabaseIds(domainNamesAll);
}
Collection<String> readDomainNames(LoaderInputData inputData) throws IOException {
final Set<String> domainNamesAll = new HashSet<>(100_000);
var domainFiles = inputData.listDomainFiles();
for (var file : domainFiles) {
domainNamesAll.addAll(DomainRecordParquetFileReader.getDomainNames(file));
}
var linkFiles = inputData.listDomainLinkFiles();
for (var file : linkFiles) {
domainNamesAll.addAll(DomainLinkRecordParquetFileReader.getDestDomainNames(file));
}
return domainNamesAll;
}
DomainIdRegistry getDatabaseIds(Collection<String> domainNamesAll) throws SQLException {
Set<String> domainNamesAll = new HashSet<>();
DomainIdRegistry ret = new DomainIdRegistry();
try (var conn = dataSource.getConnection();
var insertStmt = conn.prepareStatement("""
INSERT IGNORE INTO EC_DOMAIN (DOMAIN_NAME, DOMAIN_TOP) VALUES (?, ?)
""");
var selectStmt = conn.prepareStatement("""
SELECT ID, DOMAIN_NAME FROM EC_DOMAIN WHERE DOMAIN_NAME=?
""")
) {
int i = 0;
for (var domain : domainNamesAll) {
var parsed = new EdgeDomain(domain);
insertStmt.setString(1, domain);
insertStmt.setString(2, parsed.domain);
insertStmt.addBatch();
if (++i > 1000) {
i = 0;
insertStmt.executeBatch();
try (var inserter = new DomainInserter(conn, nodeId)) {
for (var domain : readSetDomainNames(inputData)) {
inserter.accept(new EdgeDomain(domain));
domainNamesAll.add(domain);
}
}
if (i > 0) {
insertStmt.executeBatch();
try (var inserter = new DomainInserter(conn, -1)) {
for (var domain : readReferencedDomainNames(inputData)) {
inserter.accept(new EdgeDomain(domain));
domainNamesAll.add(domain);
}
}
try (var updater = new DomainAffinityUpdater(conn, nodeId)) {
for (var domain : readSetDomainNames(inputData)) {
updater.accept(new EdgeDomain(domain));
}
}
for (var domain : domainNamesAll) {
@ -95,4 +81,87 @@ public class DomainLoaderService {
return ret;
}
Collection<String> readSetDomainNames(LoaderInputData inputData) throws IOException {
final Set<String> domainNamesAll = new HashSet<>(100_000);
var domainFiles = inputData.listDomainFiles();
for (var file : domainFiles) {
domainNamesAll.addAll(DomainRecordParquetFileReader.getDomainNames(file));
}
return domainNamesAll;
}
Collection<String> readReferencedDomainNames(LoaderInputData inputData) throws IOException {
final Set<String> domainNamesAll = new HashSet<>(100_000);
var linkFiles = inputData.listDomainLinkFiles();
for (var file : linkFiles) {
domainNamesAll.addAll(DomainLinkRecordParquetFileReader.getDestDomainNames(file));
}
return domainNamesAll;
}
private class DomainInserter implements AutoCloseable {
private final PreparedStatement statement;
private final int nodeAffinity;
private int count = 0;
public DomainInserter(Connection connection, int affinity) throws SQLException {
nodeAffinity = affinity;
statement = connection.prepareStatement("INSERT IGNORE INTO EC_DOMAIN (DOMAIN_NAME, DOMAIN_TOP, NODE_AFFINITY) VALUES (?, ?, ?)");
}
public void accept(EdgeDomain domain) throws SQLException {
statement.setString(1, domain.toString());
statement.setString(2, domain.domain);
statement.setInt(3, nodeAffinity);
statement.addBatch();
if (++count > 1000) {
statement.executeBatch();
}
}
@Override
public void close() throws SQLException {
if (count > 0) {
statement.executeBatch();
}
statement.close();
}
}
private static class DomainAffinityUpdater implements AutoCloseable {
private final PreparedStatement statement;
private final int nodeAffinity;
private int count = 0;
public DomainAffinityUpdater(Connection connection, int affinity) throws SQLException {
this.nodeAffinity = affinity;
statement = connection.prepareStatement("UPDATE EC_DOMAIN SET NODE_AFFINITY = ? WHERE DOMAIN_NAME=?");
}
public void accept(EdgeDomain domain) throws SQLException {
statement.setInt(1, nodeAffinity);
statement.setString(2, domain.toString());
statement.addBatch();
if (++count > 1000) {
statement.executeBatch();
}
}
@Override
public void close() throws SQLException {
if (count > 0) {
statement.executeBatch();
}
statement.close();
}
}
}

View File

@ -1,6 +1,8 @@
package nu.marginalia.loading.domains;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.io.processed.DomainLinkRecordParquetFileWriter;
import nu.marginalia.io.processed.DomainRecordParquetFileWriter;
import nu.marginalia.io.processed.ProcessedDataFileNames;
@ -57,6 +59,7 @@ class DomainLoaderServiceTest {
toDelete.clear();
}
@Test
void readDomainNames() throws IOException {
Path workDir = Files.createTempDirectory(getClass().getSimpleName());
@ -92,43 +95,14 @@ class DomainLoaderServiceTest {
}
}
// Read them
var domainService = new DomainLoaderService(null);
var domainNames = domainService.readDomainNames(new LoaderInputData(workDir, 2));
var domainService = new DomainLoaderService(null, new ProcessConfiguration("test", 1, UUID.randomUUID()));
// Verify
Set<String> expectedDomains = Stream.of(domains1, domains2, linkDomains)
.flatMap(List::stream)
.collect(Collectors.toSet());
assertEquals(expectedDomains, domainNames);
}
Set<String> expectedDomains1 = Sets.union(new HashSet<>(domains1), new HashSet<>(domains2));
assertEquals(expectedDomains1, domainService.readSetDomainNames(new LoaderInputData(workDir, 2)));
@Test
void getDatabaseIds() {
try (var dataSource = DbTestUtil.getConnection(mariaDBContainer.getJdbcUrl())) {
var domainService = new DomainLoaderService(dataSource);
for (int i = 0; i < 2; i++) {
// run the test case twice to cover both the insert and query cases
System.out.println("Case " + i);
var domains = List.of("memex.marginalia.nu", "www.marginalia.nu", "search.marginalia.nu", "wiby.me");
var data = domainService.getDatabaseIds(domains);
Map<String, Integer> ids = new HashMap<>();
for (String domain : domains) {
ids.put(domain, data.getDomainId(domain));
}
// Verify we got 4 domain IDs for the provided inputs
var entries = new HashSet<>(ids.values());
assertEquals(4, entries.size());
assertEquals(Set.of(1,2,3,4), entries); // this may be fragile?
}
} catch (SQLException e) {
Assertions.fail(e);
}
Set<String> expectedDomains2 = new HashSet<>(linkDomains);
assertEquals(expectedDomains2, domainService.readReferencedDomainNames(new LoaderInputData(workDir, 2)));
}
private DomainRecord dr(String domainName) {

View File

@ -1,6 +1,9 @@
package nu.marginalia.loading.links;
import com.google.common.collect.Lists;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.io.processed.DomainLinkRecordParquetFileWriter;
import nu.marginalia.io.processed.DomainRecordParquetFileWriter;
import nu.marginalia.io.processed.ProcessedDataFileNames;
@ -11,7 +14,6 @@ import nu.marginalia.model.processed.DomainLinkRecord;
import nu.marginalia.model.processed.DomainRecord;
import nu.marginalia.process.control.ProcessAdHocTaskHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
@ -43,8 +45,38 @@ class DomainLinksLoaderServiceTest {
.withInitScript("db/migration/V23_06_0_000__base.sql")
.withNetworkAliases("mariadb");
HikariDataSource dataSource;
@BeforeEach
public void setUp() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(mariaDBContainer.getJdbcUrl());
config.setUsername("wmsa");
config.setPassword("wmsa");
dataSource = new HikariDataSource(config);
List<String> migrations = List.of("db/migration/V23_11_0_007__domain_node_affinity.sql");
for (String migration : migrations) {
try (var resource = Objects.requireNonNull(ClassLoader.getSystemResourceAsStream(migration),
"Could not load migration script " + migration);
var conn = dataSource.getConnection();
var stmt = conn.createStatement()
) {
String script = new String(resource.readAllBytes());
String[] cmds = script.split("\\s*;\\s*");
for (String cmd : cmds) {
if (cmd.isBlank())
continue;
System.out.println(cmd);
stmt.executeUpdate(cmd);
}
} catch (IOException | SQLException ex) {
}
}
heartbeat = Mockito.mock(ProcessHeartbeat.class);
Mockito.when(heartbeat.createAdHocTaskHeartbeat(Mockito.anyString())).thenReturn(
@ -59,6 +91,7 @@ class DomainLinksLoaderServiceTest {
}
toDelete.clear();
dataSource.close();
}
@Test
@ -99,7 +132,7 @@ class DomainLinksLoaderServiceTest {
SELECT SOURCE_DOMAIN_ID, DEST_DOMAIN_ID FROM EC_DOMAIN_LINK
""")
) {
var domainService = new DomainLoaderService(dataSource);
var domainService = new DomainLoaderService(dataSource, new ProcessConfiguration("test", 1, UUID.randomUUID()));
var input = new LoaderInputData(workDir, 2);
var domainRegistry = domainService.getOrCreateDomainIds(input);

View File

@ -5,7 +5,6 @@ import com.google.inject.Inject;
import nu.marginalia.client.ServiceMonitors;
import nu.marginalia.control.app.svc.*;
import nu.marginalia.control.node.svc.ControlNodeActionsService;
import nu.marginalia.control.node.svc.ControlActorService;
import nu.marginalia.control.node.svc.ControlFileStorageService;
import nu.marginalia.control.node.svc.ControlNodeService;
import nu.marginalia.control.sys.svc.ControlSysActionsService;
@ -44,7 +43,6 @@ public class ControlService extends Service {
HeartbeatService heartbeatService,
EventLogService eventLogService,
RendererFactory rendererFactory,
ControlActorService controlActorService,
StaticResources staticResources,
MessageQueueService messageQueueService,
ControlFileStorageService controlFileStorageService,
@ -71,7 +69,6 @@ public class ControlService extends Service {
// node
controlFileStorageService.register();
controlActorService.register();
nodeActionsService.register();
controlNodeService.register();

View File

@ -0,0 +1,9 @@
package nu.marginalia.control.actor;
public enum ControlActor {
REBALANCE;
public String id() {
return "fsm:" + name().toLowerCase();
}
}

View File

@ -0,0 +1,111 @@
package nu.marginalia.control.actor;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.SneakyThrows;
import nu.marginalia.actor.ActorStateMachine;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.control.actor.rebalance.RebalanceActor;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
@Singleton
public class ControlActorService {
private final ServiceEventLog eventLog;
private final Gson gson;
private final MessageQueueFactory messageQueueFactory;
public Map<ControlActor, ActorStateMachine> stateMachines = new HashMap<>();
public Map<ControlActor, AbstractActorPrototype> actorDefinitions = new HashMap<>();
private final int node;
@Inject
public ControlActorService(MessageQueueFactory messageQueueFactory,
BaseServiceParams baseServiceParams,
RebalanceActor rebalanceActor
) {
this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog;
this.gson = GsonFactory.get();
this.node = baseServiceParams.configuration.node();
register(ControlActor.REBALANCE, rebalanceActor);
}
private void register(ControlActor process, AbstractActorPrototype graph) {
var sm = new ActorStateMachine(messageQueueFactory, process.id(), node, UUID.randomUUID(), graph);
sm.listen((function, param) -> logStateChange(process, function));
stateMachines.put(process, sm);
actorDefinitions.put(process, graph);
}
private void logStateChange(ControlActor process, String state) {
eventLog.logEvent("FSM-STATE-CHANGE", process.id() + " -> " + state);
}
public void startFrom(ControlActor process, String state) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state);
}
public void start(ControlActor process) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init();
}
public <T> void startFrom(ControlActor process, String state, Object arg) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state, gson.toJson(arg));
}
public <T> void startFromJSON(ControlActor process, String state, String json) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state, json);
}
public <T> void start(ControlActor process, Object arg) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init(gson.toJson(arg));
}
public <T> void startJSON(ControlActor process, String json) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init(json);
}
@SneakyThrows
public void stop(ControlActor process) {
eventLog.logEvent("FSM-STOP", process.id());
stateMachines.get(process).abortExecution();
}
public Map<ControlActor, ActorStateInstance> getActorStates() {
return stateMachines.entrySet().stream().collect(
Collectors.toMap(
Map.Entry::getKey, e -> e.getValue().getState())
);
}
public boolean isDirectlyInitializable(ControlActor actor) {
return actorDefinitions.get(actor).isDirectlyInitializable();
}
public AbstractActorPrototype getActorDefinition(ControlActor actor) {
return actorDefinitions.get(actor);
}
}

View File

@ -0,0 +1,169 @@
package nu.marginalia.control.actor.rebalance;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeConfiguration;
import org.jetbrains.annotations.NotNull;
import java.sql.SQLException;
import java.util.*;
public class RebalanceActor extends AbstractActorPrototype {
// States
public static final String INIT = "INIT";
public static final String CALCULATE_TRANSACTIONS = "CALCULATE_TRANSACTIONS";
public static final String END = "END";
private final NodeConfigurationService nodeConfigurationService;
private final MqPersistence mqPersistence;
private final HikariDataSource dataSource;
@Override
public String describe() {
return "Rebalances crawl data among the nodes";
}
@Inject
public RebalanceActor(ActorStateFactory stateFactory,
NodeConfigurationService nodeConfigurationService,
MqPersistence mqPersistence, HikariDataSource dataSource)
{
super(stateFactory);
this.nodeConfigurationService = nodeConfigurationService;
this.mqPersistence = mqPersistence;
this.dataSource = dataSource;
}
@ActorState(name= INIT, next = CALCULATE_TRANSACTIONS, resume = ActorResumeBehavior.ERROR,
description = "Fetches the number of domains assigned to each eligible processing node")
public Map<Integer, Integer> getPopulations() throws Exception {
return getNodePopulations();
}
@ActorState(name= CALCULATE_TRANSACTIONS, next = END, resume = ActorResumeBehavior.ERROR,
description = "Calculates how many domains to re-assign between the processing nodes"
)
public List<Give> calculateTransactions(Map<Integer, Integer> populations) {
if (populations.size() <= 1) {
transition(END);
}
int average = (int) populations.values().stream().mapToInt(Integer::valueOf).average().orElse(0);
int tolerance = average / 10;
PriorityQueue<Sur> surplusList = new PriorityQueue<>();
PriorityQueue<Def> deficitList = new PriorityQueue<>();
populations.forEach((node, count) -> {
int delta = count - average;
if (delta - tolerance > 0) {
surplusList.add(new Sur(node, delta));
}
else if (delta + tolerance < 0) {
deficitList.add(new Def(node, -delta));
}
});
List<Give> actions = new ArrayList<>();
while (!surplusList.isEmpty() && !deficitList.isEmpty()) {
var sur = surplusList.poll();
var def = deficitList.poll();
assert (sur.n != def.n);
int amount = Math.min(sur.c, def.c);
actions.add(new Give(sur.n, def.n, amount));
if (sur.c - amount > tolerance) {
surplusList.add(new Sur(sur.n, sur.c - amount));
}
if (def.c - amount > tolerance) {
deficitList.add(new Def(def.n, def.c - amount));
}
}
return actions;
}
private Map<Integer, Integer> getNodePopulations() throws SQLException {
Map<Integer, Integer> ret = new HashMap<>();
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT NODE_AFFINITY, COUNT(*)
FROM EC_DOMAIN
GROUP BY NODE_AFFINITY
WHERE NODE_AFFINITY > 0
""")) {
var rs = query.executeQuery();
while (rs.next()) {
ret.put(rs.getInt(1), rs.getInt(2));
}
}
for (var node : nodeConfigurationService.getAll()) {
if (isNodeExcluded(node)) {
ret.remove(node.node());
} else {
ret.putIfAbsent(node.node(), 0);
}
}
return ret;
}
private boolean isNodeExcluded(NodeConfiguration node) {
return node.disabled();
}
//* 1. calculate sizes for each node using db
//
//2. rebalance
//
//-- find average
//-- calculate surplus and deficit, with a NN% tolerance
//-- create instructions for redistribution
//
//3. instruct each executor to transfer data:
//
//-- transfer domain data
//-- append to receiver crawler log
//-- instruct donor to delete file
//
//4. regenerate crawler logs based on present files on all donor nodes */
public record Sur(int n, int c) implements Comparable<Sur> {
@Override
public int compareTo(@NotNull RebalanceActor.Sur o) {
int d = Integer.compare(o.c, c);
if (d != 0)
return d;
return Integer.compare(n, o.n);
}
}
public record Def(int n, int c) implements Comparable<Def> {
@Override
public int compareTo(@NotNull RebalanceActor.Def o) {
int d = Integer.compare(o.c, c);
if (d != 0)
return d;
return Integer.compare(n, o.n);
}
}
public record Give(int donor, int dest, int c) {
}
}

View File

@ -1,103 +0,0 @@
package nu.marginalia.control.node.svc;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.client.Context;
import nu.marginalia.control.Redirects;
import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.executor.model.ActorRunState;
import spark.Request;
import spark.Response;
import spark.Spark;
import java.util.List;
@Singleton
public class ControlActorService {
private final ExecutorClient executorClient;
@Inject
public ControlActorService(ExecutorClient executorClient) {
this.executorClient = executorClient;
}
public void register() {
Spark.post("/public/nodes/:node/storage/:fid/crawl", this::triggerCrawling, Redirects.redirectToActors);
Spark.post("/public/nodes/:node/storage/:fid/process", this::triggerProcessing, Redirects.redirectToActors);
Spark.post("/public/nodes/:node/storage/:fid/process-and-load", this::triggerProcessingWithLoad, Redirects.redirectToActors);
Spark.post("/public/nodes/:node/storage/:fid/load", this::loadProcessedData, Redirects.redirectToActors);
Spark.post("/public/nodes/:node/storage/:fid/restore-backup", this::restoreBackup, Redirects.redirectToActors);
Spark.post("/public/nodes/:node/storage/specs", this::createCrawlSpecification, Redirects.redirectToStorage);
Spark.post("/public/nodes/:node/fsms/:fsm/start", this::startFsm, Redirects.redirectToActors);
Spark.post("/public/nodes/:node/fsms/:fsm/stop", this::stopFsm, Redirects.redirectToActors);
}
public Object startFsm(Request req, Response rsp) throws Exception {
executorClient.startFsm(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fsm").toUpperCase());
return "";
}
public Object stopFsm(Request req, Response rsp) throws Exception {
executorClient.stopFsm(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fsm").toUpperCase());
return "";
}
public Object triggerCrawling(Request req, Response response) throws Exception {
executorClient.triggerCrawl(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fid"));
return "";
}
public Object triggerProcessing(Request req, Response response) throws Exception {
executorClient.triggerConvert(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fid"));
return "";
}
public Object triggerProcessingWithLoad(Request req, Response response) throws Exception {
executorClient.triggerProcessAndLoad(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fid"));
return "";
}
public Object loadProcessedData(Request req, Response response) throws Exception {
executorClient.loadProcessedData(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fid"));
return "";
}
public List<ActorRunState> getActorStates(Request req) {
return executorClient.getActorStates(Context.fromRequest(req), Integer.parseInt(req.params("node"))).states();
}
public Object createCrawlSpecification(Request request, Response response) throws Exception {
final String description = request.queryParams("description");
final String url = request.queryParams("url");
final String source = request.queryParams("source");
if ("db".equals(source)) {
executorClient.createCrawlSpecFromDb(Context.fromRequest(request), 0, description);
}
else if ("download".equals(source)) {
executorClient.createCrawlSpecFromDownload(Context.fromRequest(request), 0, description, url);
}
else {
throw new IllegalArgumentException("Unknown source: " + source);
}
return "";
}
public Object restoreBackup(Request req, Response response) throws Exception {
executorClient.restoreBackup(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fid"));
return "";
}
}

View File

@ -2,7 +2,9 @@ package nu.marginalia.control.node.svc;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.client.Context;
import nu.marginalia.control.Redirects;
import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import org.slf4j.Logger;
@ -19,16 +21,17 @@ import java.sql.SQLException;
@Singleton
public class ControlFileStorageService {
private final FileStorageService fileStorageService;
private final ExecutorClient executorClient;
private final Logger logger = LoggerFactory.getLogger(getClass());
@Inject
public ControlFileStorageService( FileStorageService fileStorageService)
public ControlFileStorageService(FileStorageService fileStorageService, ExecutorClient executorClient)
{
this.fileStorageService = fileStorageService;
this.executorClient = executorClient;
}
public void register() throws IOException {
Spark.get("/public/storage/:id/file", this::downloadFileFromStorage);
Spark.post("/public/storage/:fid/delete", this::flagFileForDeletionRequest, Redirects.redirectToStorage);
}
@ -39,29 +42,5 @@ public class ControlFileStorageService {
return "";
}
public Object downloadFileFromStorage(Request request, Response response) throws SQLException {
var fileStorageId = FileStorageId.parse(request.params("id"));
String filename = request.queryParams("name");
Path root = fileStorageService.getStorage(fileStorageId).asPath();
Path filePath = root.resolve(filename).normalize();
if (!filePath.startsWith(root)) {
response.status(403);
return "";
}
if (filePath.endsWith(".txt") || filePath.endsWith(".log")) response.type("text/plain");
else response.type("application/octet-stream");
try (var is = Files.newInputStream(filePath)) {
is.transferTo(response.raw().getOutputStream());
}
catch (IOException ex) {
logger.error("Failed to download file", ex);
throw new RuntimeException(ex);
}
return "";
}
}

View File

@ -14,7 +14,6 @@ import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeConfiguration;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.executor.model.crawl.RecrawlParameters;
import nu.marginalia.executor.model.load.LoadParameters;
import nu.marginalia.renderer.RendererFactory;
import nu.marginalia.service.id.ServiceId;
@ -25,9 +24,7 @@ import spark.Request;
import spark.Response;
import spark.Spark;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.*;
@ -102,6 +99,7 @@ public class ControlNodeService {
Spark.post("/public/nodes/:id/storage/:fid/delete", this::deleteFileStorage);
Spark.post("/public/nodes/:id/storage/:fid/enable", this::enableFileStorage);
Spark.post("/public/nodes/:id/storage/:fid/disable", this::disableFileStorage);
Spark.get("/public/nodes/:id/storage/:fid/transfer", this::downloadFileFromStorage);
}
@ -145,10 +143,7 @@ public class ControlNodeService {
final String source = request.queryParams("source");
int nodeId = Integer.parseInt(request.params("id"));
if ("db".equals(source)) {
executorClient.createCrawlSpecFromDb(Context.fromRequest(request), nodeId, description);
}
else if ("download".equals(source)) {
if ("download".equals(source)) {
executorClient.createCrawlSpecFromDownload(Context.fromRequest(request), nodeId, description, url);
}
else {
@ -174,11 +169,9 @@ public class ControlNodeService {
if (toCrawl.size() != 1)
throw new IllegalStateException();
var specs = fileStorageService.getActiveFileStorages(nodeId, FileStorageType.CRAWL_SPEC);
executorClient.triggerRecrawl(Context.fromRequest(request),
nodeId,
new RecrawlParameters(toCrawl.get(0), specs));
toCrawl.get(0));
return redirectToOverview(request);
}
@ -360,6 +353,24 @@ public class ControlNodeService {
);
}
public Object downloadFileFromStorage(Request request, Response response) throws IOException {
int nodeId = Integer.parseInt(request.params("id"));
var fileStorageId = FileStorageId.parse(request.params("fid"));
String path = request.queryParams("path");
response.header("content-disposition", "attachment; filename=\""+path+"\"");
if (path.endsWith(".txt") || path.endsWith(".log"))
response.type("text/plain");
else
response.type("application/octet-stream");
executorClient.transferFile(Context.fromRequest(request), nodeId, fileStorageId, path, response.raw().getOutputStream());
return "";
}
private Object getStorageBaseList(int nodeId) throws SQLException {
List<FileStorageBase> bases = new ArrayList<>();

View File

@ -8,7 +8,9 @@
</tr>
{{#each files}}
<tr>
<td> {{filename}} </td>
<td>
<a href="/nodes/{{node.id}}/storage/{{self.storage.id}}/transfer?path={{filename}}">{{filename}}</a>
</td>
<td>{{mTime}}</td>
<td>{{size}}</td>
</tr>

View File

@ -0,0 +1,64 @@
package nu.marginalia.control.actor.rebalance;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.junit.Assert.assertEquals;
class RebalanceActorTest {
RebalanceActor actor = new RebalanceActor(null, null, null, null);
@Test
void calculateTransactions1_2() {
var transactions = actor.calculateTransactions(Map.of(1, 100, 2, 0));
var expected = List.of(new RebalanceActor.Give(1, 2, 50));
Assertions.assertEquals(expected, transactions);
}
@Test
void calculateTransactions1_3() {
var transactions = actor.calculateTransactions(Map.of(1, 90, 2, 0, 3, 0));
var expected = List.of(
new RebalanceActor.Give(1, 2, 30),
new RebalanceActor.Give(1, 3, 30)
);
Assertions.assertEquals(expected, transactions);
}
@Test
void calculateTransactions2_3() {
var transactions = actor.calculateTransactions(Map.of(1, 30, 2, 30, 3, 0));
var expected = List.of(
new RebalanceActor.Give(1, 3, 10),
new RebalanceActor.Give(2, 3, 10)
);
Assertions.assertEquals(expected, transactions);
}
@Test
void calculateTransactionsEmpty() {
try {
actor.calculateTransactions(Map.of());
Assertions.fail("Expected transition");
}
catch (AbstractActorPrototype.ControlFlowException ex) {
Assertions.assertEquals("END", ex.getState());
}
try {
actor.calculateTransactions(Map.of(1, 100));
Assertions.fail("Expected transition");
}
catch (AbstractActorPrototype.ControlFlowException ex) {
Assertions.assertEquals("END", ex.getState());
}
}
}

View File

@ -24,6 +24,7 @@ java {
dependencies {
implementation project(':code:common:config')
implementation project(':code:common:model')
implementation project(':code:common:process')
implementation project(':code:common:db')
implementation project(':code:common:service')

View File

@ -10,15 +10,15 @@ import spark.Spark;
@Singleton
public class ActorApi {
private final ActorControlService actors;
private final ExecutorActorControlService actors;
private final Logger logger = LoggerFactory.getLogger(getClass());
@Inject
public ActorApi(ActorControlService actors) {
public ActorApi(ExecutorActorControlService actors) {
this.actors = actors;
}
public Object startActorFromState(Request request, Response response) throws Exception {
Actor actor = translateActor(request.params("id"));
ExecutorActor actor = translateActor(request.params("id"));
String state = request.params("state");
actors.startFromJSON(actor, state, request.body());
@ -27,7 +27,7 @@ public class ActorApi {
}
public Object startActor(Request request, Response response) throws Exception {
Actor actor = translateActor(request.params("id"));
ExecutorActor actor = translateActor(request.params("id"));
actors.startJSON(actor, request.body());
@ -35,16 +35,16 @@ public class ActorApi {
}
public Object stopActor(Request request, Response response) {
Actor actor = translateActor(request.params("id"));
ExecutorActor actor = translateActor(request.params("id"));
actors.stop(actor);
return "OK";
}
public Actor translateActor(String name) {
public ExecutorActor translateActor(String name) {
try {
return Actor.valueOf(name.toUpperCase());
return ExecutorActor.valueOf(name.toUpperCase());
}
catch (IllegalArgumentException ex) {
logger.error("Unknown actor {}", name);

View File

@ -1,145 +0,0 @@
package nu.marginalia.actor;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.SneakyThrows;
import nu.marginalia.actor.monitor.*;
import nu.marginalia.actor.proc.*;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.actor.task.*;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
/** This class is responsible for starting and stopping the various actors in the responsible service */
@Singleton
public class ActorControlService {
private final ServiceEventLog eventLog;
private final Gson gson;
private final MessageQueueFactory messageQueueFactory;
public Map<Actor, ActorStateMachine> stateMachines = new HashMap<>();
public Map<Actor, AbstractActorPrototype> actorDefinitions = new HashMap<>();
private final int node;
@Inject
public ActorControlService(MessageQueueFactory messageQueueFactory,
BaseServiceParams baseServiceParams,
ConvertActor convertActor,
ConvertAndLoadActor convertAndLoadActor,
CrawlActor crawlActor,
RecrawlActor recrawlActor,
RestoreBackupActor restoreBackupActor,
ConverterMonitorActor converterMonitorFSM,
CrawlerMonitorActor crawlerMonitorActor,
LoaderMonitorActor loaderMonitor,
MessageQueueMonitorActor messageQueueMonitor,
ProcessLivenessMonitorActor processMonitorFSM,
FileStorageMonitorActor fileStorageMonitorActor,
IndexConstructorMonitorActor indexConstructorMonitorActor,
TriggerAdjacencyCalculationActor triggerAdjacencyCalculationActor,
CrawlJobExtractorActor crawlJobExtractorActor,
ExportDataActor exportDataActor,
TruncateLinkDatabase truncateLinkDatabase
) {
this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog;
this.gson = GsonFactory.get();
this.node = baseServiceParams.configuration.node();
register(Actor.CRAWL, crawlActor);
register(Actor.RECRAWL, recrawlActor);
register(Actor.CONVERT, convertActor);
register(Actor.RESTORE_BACKUP, restoreBackupActor);
register(Actor.CONVERT_AND_LOAD, convertAndLoadActor);
register(Actor.PROC_INDEX_CONSTRUCTOR_SPAWNER, indexConstructorMonitorActor);
register(Actor.PROC_CONVERTER_SPAWNER, converterMonitorFSM);
register(Actor.PROC_LOADER_SPAWNER, loaderMonitor);
register(Actor.PROC_CRAWLER_SPAWNER, crawlerMonitorActor);
register(Actor.MONITOR_MESSAGE_QUEUE, messageQueueMonitor);
register(Actor.MONITOR_PROCESS_LIVENESS, processMonitorFSM);
register(Actor.MONITOR_FILE_STORAGE, fileStorageMonitorActor);
register(Actor.ADJACENCY_CALCULATION, triggerAdjacencyCalculationActor);
register(Actor.CRAWL_JOB_EXTRACTOR, crawlJobExtractorActor);
register(Actor.EXPORT_DATA, exportDataActor);
register(Actor.TRUNCATE_LINK_DATABASE, truncateLinkDatabase);
}
private void register(Actor process, AbstractActorPrototype graph) {
var sm = new ActorStateMachine(messageQueueFactory, process.id(), node, UUID.randomUUID(), graph);
sm.listen((function, param) -> logStateChange(process, function));
stateMachines.put(process, sm);
actorDefinitions.put(process, graph);
}
private void logStateChange(Actor process, String state) {
eventLog.logEvent("FSM-STATE-CHANGE", process.id() + " -> " + state);
}
public void startFrom(Actor process, String state) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state);
}
public void start(Actor process) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init();
}
public <T> void startFrom(Actor process, String state, Object arg) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state, gson.toJson(arg));
}
public <T> void startFromJSON(Actor process, String state, String json) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state, json);
}
public <T> void start(Actor process, Object arg) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init(gson.toJson(arg));
}
public <T> void startJSON(Actor process, String json) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init(json);
}
@SneakyThrows
public void stop(Actor process) {
eventLog.logEvent("FSM-STOP", process.id());
stateMachines.get(process).abortExecution();
}
public Map<Actor, ActorStateInstance> getActorStates() {
return stateMachines.entrySet().stream().collect(
Collectors.toMap(
Map.Entry::getKey, e -> e.getValue().getState())
);
}
public boolean isDirectlyInitializable(Actor actor) {
return actorDefinitions.get(actor).isDirectlyInitializable();
}
public AbstractActorPrototype getActorDefinition(Actor actor) {
return actorDefinitions.get(actor);
}
}

View File

@ -1,6 +1,6 @@
package nu.marginalia.actor;
public enum Actor {
public enum ExecutorActor {
CRAWL,
RECRAWL,
CONVERT_AND_LOAD,
@ -16,6 +16,7 @@ public enum Actor {
TRUNCATE_LINK_DATABASE,
PROC_INDEX_CONSTRUCTOR_SPAWNER,
CONVERT,
TRANSFER_DOMAINS,
RESTORE_BACKUP;
public String id() {

View File

@ -0,0 +1,147 @@
package nu.marginalia.actor;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.SneakyThrows;
import nu.marginalia.actor.monitor.*;
import nu.marginalia.actor.proc.*;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.actor.task.*;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
/** This class is responsible for starting and stopping the various actors in the responsible service */
@Singleton
public class ExecutorActorControlService {
private final ServiceEventLog eventLog;
private final Gson gson;
private final MessageQueueFactory messageQueueFactory;
public Map<ExecutorActor, ActorStateMachine> stateMachines = new HashMap<>();
public Map<ExecutorActor, AbstractActorPrototype> actorDefinitions = new HashMap<>();
private final int node;
@Inject
public ExecutorActorControlService(MessageQueueFactory messageQueueFactory,
BaseServiceParams baseServiceParams,
ConvertActor convertActor,
ConvertAndLoadActor convertAndLoadActor,
CrawlActor crawlActor,
RecrawlActor recrawlActor,
RestoreBackupActor restoreBackupActor,
ConverterMonitorActor converterMonitorFSM,
CrawlerMonitorActor crawlerMonitorActor,
LoaderMonitorActor loaderMonitor,
MessageQueueMonitorActor messageQueueMonitor,
ProcessLivenessMonitorActor processMonitorFSM,
FileStorageMonitorActor fileStorageMonitorActor,
IndexConstructorMonitorActor indexConstructorMonitorActor,
TriggerAdjacencyCalculationActor triggerAdjacencyCalculationActor,
CrawlJobExtractorActor crawlJobExtractorActor,
ExportDataActor exportDataActor,
TruncateLinkDatabase truncateLinkDatabase,
TransferDomainsActor transferDomainsActor
) {
this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog;
this.gson = GsonFactory.get();
this.node = baseServiceParams.configuration.node();
register(ExecutorActor.CRAWL, crawlActor);
register(ExecutorActor.RECRAWL, recrawlActor);
register(ExecutorActor.CONVERT, convertActor);
register(ExecutorActor.RESTORE_BACKUP, restoreBackupActor);
register(ExecutorActor.CONVERT_AND_LOAD, convertAndLoadActor);
register(ExecutorActor.PROC_INDEX_CONSTRUCTOR_SPAWNER, indexConstructorMonitorActor);
register(ExecutorActor.PROC_CONVERTER_SPAWNER, converterMonitorFSM);
register(ExecutorActor.PROC_LOADER_SPAWNER, loaderMonitor);
register(ExecutorActor.PROC_CRAWLER_SPAWNER, crawlerMonitorActor);
register(ExecutorActor.MONITOR_MESSAGE_QUEUE, messageQueueMonitor);
register(ExecutorActor.MONITOR_PROCESS_LIVENESS, processMonitorFSM);
register(ExecutorActor.MONITOR_FILE_STORAGE, fileStorageMonitorActor);
register(ExecutorActor.ADJACENCY_CALCULATION, triggerAdjacencyCalculationActor);
register(ExecutorActor.CRAWL_JOB_EXTRACTOR, crawlJobExtractorActor);
register(ExecutorActor.EXPORT_DATA, exportDataActor);
register(ExecutorActor.TRUNCATE_LINK_DATABASE, truncateLinkDatabase);
register(ExecutorActor.TRANSFER_DOMAINS, transferDomainsActor);
}
private void register(ExecutorActor process, AbstractActorPrototype graph) {
var sm = new ActorStateMachine(messageQueueFactory, process.id(), node, UUID.randomUUID(), graph);
sm.listen((function, param) -> logStateChange(process, function));
stateMachines.put(process, sm);
actorDefinitions.put(process, graph);
}
private void logStateChange(ExecutorActor process, String state) {
eventLog.logEvent("FSM-STATE-CHANGE", process.id() + " -> " + state);
}
public void startFrom(ExecutorActor process, String state) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state);
}
public void start(ExecutorActor process) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init();
}
public <T> void startFrom(ExecutorActor process, String state, Object arg) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state, gson.toJson(arg));
}
public <T> void startFromJSON(ExecutorActor process, String state, String json) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state, json);
}
public <T> void start(ExecutorActor process, Object arg) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init(gson.toJson(arg));
}
public <T> void startJSON(ExecutorActor process, String json) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init(json);
}
@SneakyThrows
public void stop(ExecutorActor process) {
eventLog.logEvent("FSM-STOP", process.id());
stateMachines.get(process).abortExecution();
}
public Map<ExecutorActor, ActorStateInstance> getActorStates() {
return stateMachines.entrySet().stream().collect(
Collectors.toMap(
Map.Entry::getKey, e -> e.getValue().getState())
);
}
public boolean isDirectlyInitializable(ExecutorActor actor) {
return actorDefinitions.get(actor).isDirectlyInitializable();
}
public AbstractActorPrototype getActorDefinition(ExecutorActor actor) {
return actorDefinitions.get(actor);
}
}

View File

@ -9,6 +9,7 @@ import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.crawlspec.CrawlSpecFileNames;
import nu.marginalia.db.DbDomainStatsExportMultitool;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageBaseType;
import nu.marginalia.storage.model.FileStorageType;
@ -28,20 +29,15 @@ public class CrawlJobExtractorActor extends AbstractActorPrototype {
private final Logger logger = LoggerFactory.getLogger(getClass());
// STATES
public static final String CREATE_FROM_DB = "CREATE_FROM_DB";
public static final String CREATE_FROM_LINK = "CREATE_FROM_LINK";
public static final String END = "END";
private final FileStorageService fileStorageService;
private final HikariDataSource dataSource;
@Inject
public CrawlJobExtractorActor(ActorStateFactory stateFactory,
FileStorageService fileStorageService,
HikariDataSource dataSource
FileStorageService fileStorageService
) {
super(stateFactory);
this.fileStorageService = fileStorageService;
this.dataSource = dataSource;
}
public record CrawlJobExtractorArguments(String description) { }
@ -90,35 +86,4 @@ public class CrawlJobExtractorActor extends AbstractActorPrototype {
);
}
@ActorState(name = CREATE_FROM_DB, next = END,
resume = ActorResumeBehavior.ERROR,
description = """
Spawns a CrawlJobExtractor process that loads data from the link database, and wait for it to finish.
"""
)
public void createFromDB(CrawlJobExtractorArguments arg) throws Exception {
if (arg == null) {
error("This actor requires a CrawlJobExtractorArguments argument");
}
var base = fileStorageService.getStorageBase(FileStorageBaseType.STORAGE);
var storage = fileStorageService.allocateTemporaryStorage(base, FileStorageType.CRAWL_SPEC, "crawl-spec", arg.description());
final Path path = CrawlSpecFileNames.resolve(storage);
try (var dbTools = new DbDomainStatsExportMultitool(dataSource)) {
generateCrawlSpec(
path,
DomainSource.combined(
DomainSource.knownUrlsFromDb(dbTools),
DomainSource.fromCrawlQueue(dbTools)
),
KnownUrlsCountSource.fromDb(dbTools, 200),
KnownUrlsListSource.justIndex() // TODO: hook in linkdb maybe?
);
}
}
}

View File

@ -13,17 +13,14 @@ import nu.marginalia.actor.state.ActorState;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorage;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageType;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.crawling.CrawlRequest;
import org.jetbrains.annotations.NotNull;
import java.nio.file.Files;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
@Singleton
public class RecrawlActor extends AbstractActorPrototype {
@ -42,8 +39,12 @@ public class RecrawlActor extends AbstractActorPrototype {
@AllArgsConstructor @With @NoArgsConstructor
public static class RecrawlMessage {
public List<FileStorageId> crawlSpecId = null;
public FileStorageId crawlStorageId = null;
/** The storage where the crawl data will be stored. If this contains existing crawl
* data, it will be consulted for e.g. e-tag comparisons.
*/
@NotNull
public FileStorageId crawlStorageId;
public long crawlerMsgId = 0L;
};
@ -52,8 +53,8 @@ public class RecrawlActor extends AbstractActorPrototype {
return "Run the crawler with the given crawl spec using previous crawl data for a reference";
}
public static RecrawlMessage recrawlFromCrawlDataAndCralSpec(FileStorageId crawlData, List<FileStorageId> crawlSpec) {
return new RecrawlMessage(crawlSpec, crawlData, 0L);
public static RecrawlMessage recrawlFromCrawlDataAndCrawlSpec(FileStorageId crawlData) {
return new RecrawlMessage(crawlData, 0L);
}
@Inject
@ -83,28 +84,12 @@ public class RecrawlActor extends AbstractActorPrototype {
var crawlStorage = storageService.getStorage(recrawlMessage.crawlStorageId);
for (var specs : recrawlMessage.crawlSpecId) {
FileStorage specStorage = storageService.getStorage(specs);
if (specStorage == null) error("Bad storage id");
if (specStorage.type() != FileStorageType.CRAWL_SPEC) error("Bad storage type " + specStorage.type());
}
if (crawlStorage == null) error("Bad storage id");
if (crawlStorage.type() != FileStorageType.CRAWL_DATA) error("Bad storage type " + crawlStorage.type());
Files.deleteIfExists(crawlStorage.asPath().resolve("crawler.log"));
return recrawlMessage
.withCrawlSpecId(recrawlMessage.crawlSpecId);
}
private Optional<FileStorage> getSpec(FileStorage crawlStorage) throws SQLException {
return storageService.getSourceFromStorage(crawlStorage)
.stream()
.filter(storage -> storage.type().equals(FileStorageType.CRAWL_SPEC))
.findFirst();
return recrawlMessage;
}
@ActorState(name = CRAWL,
@ -117,7 +102,7 @@ public class RecrawlActor extends AbstractActorPrototype {
public RecrawlMessage crawl(RecrawlMessage recrawlMessage) throws Exception {
// Pre-send crawl request
var request = new CrawlRequest(recrawlMessage.crawlSpecId, recrawlMessage.crawlStorageId);
var request = new CrawlRequest(null, recrawlMessage.crawlStorageId);
long id = mqCrawlerOutbox.sendAsync(CrawlRequest.class.getSimpleName(), gson.toJson(request));
return recrawlMessage.withCrawlerMsgId(id);

View File

@ -5,7 +5,7 @@ import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.Actor;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.svc.BackupService;
import nu.marginalia.storage.model.FileStorageId;
@ -43,7 +43,7 @@ public class RestoreBackupActor extends AbstractActorPrototype {
backupService.restoreBackup(id);
mqPersistence.sendNewMessage(
Actor.CONVERT_AND_LOAD.id() + ":" + node,
ExecutorActor.CONVERT_AND_LOAD.id() + ":" + node,
null,
null,
ConvertAndLoadActor.REPARTITION,

View File

@ -0,0 +1,182 @@
package nu.marginalia.actor.task;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.client.Context;
import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageBaseType;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.UUID;
import java.util.zip.GZIPOutputStream;
@Singleton
public class TransferDomainsActor extends AbstractActorPrototype {
// STATES
public static final String INITIAL = "INITIAL";
public static final String TRANSFER_DOMAINS = "TRANSFER-DOMAINS";
public static final String UPDATE_DONOR_LOG = "UPDATE_DONOR_LOG";
public static final String END = "END";
private final FileStorageService storageService;
private final ExecutorClient executorClient;
private final MqPersistence persistence;
private final HikariDataSource dataSource;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final int nodeId;
private final String executorServiceName;
@AllArgsConstructor @With @NoArgsConstructor
public static class Message {
int sourceNode;
int count;
};
@Override
public String describe() {
return "Transfers domains between nodes' crawl data sets";
}
@Inject
public TransferDomainsActor(ActorStateFactory stateFactory,
ServiceConfiguration configuration,
FileStorageService storageService,
ExecutorClient executorClient,
MqPersistence persistence,
HikariDataSource dataSource)
{
super(stateFactory);
this.storageService = storageService;
this.executorClient = executorClient;
this.persistence = persistence;
this.dataSource = dataSource;
this.nodeId = configuration.node();
this.executorServiceName = configuration.serviceName();
}
@ActorState(name = INITIAL,
next = TRANSFER_DOMAINS,
description = """
Ensure preconditions are met
""")
public Message init(Message message) throws Exception {
var storages = storageService.getActiveFileStorages(FileStorageType.CRAWL_DATA);
// Ensure crawl data exists to receive into
if (storages.isEmpty()) {
var storage = storageService.allocateTemporaryStorage(
storageService.getStorageBase(FileStorageBaseType.STORAGE),
FileStorageType.CRAWL_DATA,
"crawl-data",
"Crawl Data"
);
storageService.enableFileStorage(storage.id());
}
return message;
}
@ActorState(name = TRANSFER_DOMAINS,
next = UPDATE_DONOR_LOG,
resume = ActorResumeBehavior.ERROR,
description = """
Do the needful
"""
)
public Message transferData(Message message) throws Exception {
var storageId = storageService.getActiveFileStorages(FileStorageType.CRAWL_DATA).get(0);
var storage = storageService.getStorage(storageId);
var spec = executorClient.getTransferSpec(Context.internal(), message.sourceNode, message.count);
if (spec.size() == 0) {
transition("END", "NOTHING TO TRANSFER");
}
Path basePath = storage.asPath();
try (var workLog = new WorkLog(basePath.resolve("crawler.log"));
var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("UPDATE EC_DOMAIN SET NODE_AFFINITY=? WHERE ID=?");
) {
for (var item : spec.items()) {
logger.info("{}", item);
logger.info("Transferring {}", item.domainName());
Path dest = basePath.resolve(item.path());
Files.createDirectories(dest.getParent());
try (var fileStream = Files.newOutputStream(dest)) {
executorClient.transferFile(Context.internal(),
message.sourceNode,
item.fileStorageId(),
item.path(),
fileStream);
stmt.setInt(1, nodeId);
stmt.setInt(2, item.domainId());
stmt.executeUpdate();
executorClient.yieldDomain(Context.internal(), message.sourceNode, item);
workLog.setJobToFinished(item.domainName(), item.path(), 1);
}
catch (IOException ex) {
Files.deleteIfExists(dest);
error(ex);
}
catch (Exception ex) {
error(ex);
}
}
}
return message;
}
@ActorState(name = UPDATE_DONOR_LOG,
next = END,
resume = ActorResumeBehavior.ERROR,
description = """
Do the needful
"""
)
public void updateDonorLog(Message message) throws InterruptedException {
var outbox = new MqOutbox(persistence, executorServiceName, message.sourceNode,
getClass().getSimpleName(), nodeId, UUID.randomUUID());
try {
outbox.send("PRUNE-CRAWL-DATA", ":-)");
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
outbox.stop();
}
}
}

View File

@ -2,24 +2,22 @@ package nu.marginalia.executor;
import com.google.gson.Gson;
import com.google.inject.Inject;
import lombok.SneakyThrows;
import nu.marginalia.actor.Actor;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ActorApi;
import nu.marginalia.actor.ActorControlService;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.executor.model.ActorRunState;
import nu.marginalia.executor.model.ActorRunStates;
import nu.marginalia.executor.storage.FileStorageContent;
import nu.marginalia.executor.storage.FileStorageFile;
import nu.marginalia.executor.svc.BackupService;
import nu.marginalia.executor.svc.ProcessingService;
import nu.marginalia.executor.svc.SideloadService;
import nu.marginalia.executor.svc.TransferService;
import nu.marginalia.service.server.BaseServiceParams;
import nu.marginalia.service.server.Service;
import nu.marginalia.service.server.mq.MqNotification;
import nu.marginalia.service.server.mq.MqRequest;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Request;
@ -27,37 +25,35 @@ import spark.Response;
import spark.Spark;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
// Weird name for this one to not have clashes with java.util.concurrent.ExecutorService
public class ExecutorSvc extends Service {
private final BaseServiceParams params;
private final ActorControlService actorControlService;
private final ExecutorActorControlService actorControlService;
private final FileStorageService fileStorageService;
private final TransferService transferService;
private static final Logger logger = LoggerFactory.getLogger(ExecutorSvc.class);
@Inject
public ExecutorSvc(BaseServiceParams params,
ActorControlService actorControlService,
ExecutorActorControlService actorControlService,
ProcessingService processingService,
SideloadService sideloadService,
BackupService backupService,
FileStorageService fileStorageService,
Gson gson,
TransferService transferService,
ActorApi actorApi) {
super(params);
this.params = params;
this.actorControlService = actorControlService;
this.fileStorageService = fileStorageService;
this.transferService = transferService;
Spark.post("/actor/:id/start", actorApi::startActor);
Spark.post("/actor/:id/start/:state", actorApi::startActorFromState);
@ -68,7 +64,6 @@ public class ExecutorSvc extends Service {
Spark.post("/process/recrawl", processingService::startRecrawl);
Spark.post("/process/convert/:fid", processingService::startConversion);
Spark.post("/process/convert-load/:fid", processingService::startConvertLoad);
Spark.post("/process/crawl-spec/from-db", processingService::createCrawlSpecFromDb);
Spark.post("/process/crawl-spec/from-download", processingService::createCrawlSpecFromDownload);
Spark.post("/process/load", processingService::startLoad);
Spark.post("/process/adjacency-calculation", processingService::startAdjacencyCalculation);
@ -78,47 +73,30 @@ public class ExecutorSvc extends Service {
Spark.post("/sideload/encyclopedia", sideloadService::sideloadEncyclopedia);
Spark.post("/backup/:fid/restore", backupService::restore);
Spark.get("/storage/:fid", this::listFiles, gson::toJson);
Spark.get("/storage/:fid", transferService::listFiles, gson::toJson);
Spark.get("/transfer/file/:fid", transferService::transferFile);
Spark.get("/transfer/spec", transferService::getTransferSpec, gson::toJson);
Spark.post("/transfer/yield", transferService::yieldDomain);
}
@MqNotification(endpoint="FIRST-BOOT")
public void setUpDefaultActors(String message) throws Exception {
logger.info("Initializing default actors");
actorControlService.start(Actor.MONITOR_PROCESS_LIVENESS);
actorControlService.start(Actor.MONITOR_FILE_STORAGE);
actorControlService.start(Actor.MONITOR_MESSAGE_QUEUE);
actorControlService.start(Actor.PROC_CONVERTER_SPAWNER);
actorControlService.start(Actor.PROC_CRAWLER_SPAWNER);
actorControlService.start(Actor.PROC_INDEX_CONSTRUCTOR_SPAWNER);
actorControlService.start(Actor.PROC_LOADER_SPAWNER);
actorControlService.start(ExecutorActor.MONITOR_PROCESS_LIVENESS);
actorControlService.start(ExecutorActor.MONITOR_FILE_STORAGE);
actorControlService.start(ExecutorActor.MONITOR_MESSAGE_QUEUE);
actorControlService.start(ExecutorActor.PROC_CONVERTER_SPAWNER);
actorControlService.start(ExecutorActor.PROC_CRAWLER_SPAWNER);
actorControlService.start(ExecutorActor.PROC_INDEX_CONSTRUCTOR_SPAWNER);
actorControlService.start(ExecutorActor.PROC_LOADER_SPAWNER);
}
@MqRequest(endpoint="PRUNE-CRAWL-DATA")
public String pruneCrawlData(String message) throws SQLException, IOException {
transferService.pruneCrawlDataMqEndpoint();
private FileStorageContent listFiles(Request request, Response response) throws SQLException, IOException {
FileStorageId fileStorageId = FileStorageId.parse(request.params("fid"));
var storage = fileStorageService.getStorage(fileStorageId);
List<FileStorageFile> files;
try (var fs = Files.list(storage.asPath())) {
files = fs.filter(Files::isRegularFile)
.map(this::createFileModel)
.sorted(Comparator.comparing(FileStorageFile::name))
.toList();
}
return new FileStorageContent(files);
}
@SneakyThrows
private FileStorageFile createFileModel(Path path) {
return new FileStorageFile(
path.toFile().getName(),
Files.size(path),
Files.getLastModifiedTime(path).toInstant().toString()
);
return "OK";
}

View File

@ -1,24 +1,24 @@
package nu.marginalia.executor.svc;
import com.google.inject.Inject;
import nu.marginalia.actor.Actor;
import nu.marginalia.actor.ActorControlService;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.task.RestoreBackupActor;
import nu.marginalia.storage.model.FileStorageId;
import spark.Request;
import spark.Response;
public class BackupService {
private final ActorControlService actorControlService;
private final ExecutorActorControlService actorControlService;
@Inject
public BackupService(ActorControlService actorControlService) {
public BackupService(ExecutorActorControlService actorControlService) {
this.actorControlService = actorControlService;
}
public Object restore(Request request, Response response) throws Exception {
var fid = FileStorageId.parse(request.params("fid"));
actorControlService.startFrom(Actor.RESTORE_BACKUP, RestoreBackupActor.RESTORE, fid);
actorControlService.startFrom(ExecutorActor.RESTORE_BACKUP, RestoreBackupActor.RESTORE, fid);
return "";
}
}

View File

@ -2,58 +2,54 @@ package nu.marginalia.executor.svc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import nu.marginalia.actor.Actor;
import nu.marginalia.actor.ActorControlService;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.task.ConvertActor;
import nu.marginalia.actor.task.ConvertAndLoadActor;
import nu.marginalia.actor.task.CrawlJobExtractorActor;
import nu.marginalia.actor.task.RecrawlActor;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.executor.model.crawl.RecrawlParameters;
import nu.marginalia.executor.model.load.LoadParameters;
import spark.Request;
import spark.Response;
public class ProcessingService {
private final ActorControlService actorControlService;
private final ExecutorActorControlService actorControlService;
private final Gson gson;
@Inject
public ProcessingService(ActorControlService actorControlService,
public ProcessingService(ExecutorActorControlService actorControlService,
Gson gson) {
this.actorControlService = actorControlService;
this.gson = gson;
}
public Object startRecrawl(Request request, Response response) throws Exception {
var params = gson.fromJson(request.body(), RecrawlParameters.class);
var crawlId = gson.fromJson(request.body(), FileStorageId.class);
actorControlService.start(
Actor.RECRAWL,
RecrawlActor.recrawlFromCrawlDataAndCralSpec(
params.crawlDataId(),
params.crawlSpecIds()
)
ExecutorActor.RECRAWL,
RecrawlActor.recrawlFromCrawlDataAndCrawlSpec(crawlId)
);
return "";
}
public Object startCrawl(Request request, Response response) throws Exception {
actorControlService.start(Actor.CRAWL, FileStorageId.parse(request.params("fid")));
actorControlService.start(ExecutorActor.CRAWL, FileStorageId.parse(request.params("fid")));
return "";
}
public Object startConversion(Request request, Response response) throws Exception {
actorControlService.startFrom(Actor.CONVERT, ConvertActor.CONVERT, FileStorageId.parse(request.params("fid")));
actorControlService.startFrom(ExecutorActor.CONVERT, ConvertActor.CONVERT, FileStorageId.parse(request.params("fid")));
return "";
}
public Object startConvertLoad(Request request, Response response) throws Exception {
actorControlService.start(
Actor.CONVERT_AND_LOAD,
ExecutorActor.CONVERT_AND_LOAD,
FileStorageId.parse(request.params("fid"))
);
return "";
@ -65,7 +61,7 @@ public class ProcessingService {
// Start the FSM from the intermediate state that triggers the load
actorControlService.startFrom(
Actor.CONVERT_AND_LOAD,
ExecutorActor.CONVERT_AND_LOAD,
ConvertAndLoadActor.LOAD,
new ConvertAndLoadActor.Message(null, params.ids(),
0L,
@ -76,20 +72,12 @@ public class ProcessingService {
}
public Object startAdjacencyCalculation(Request request, Response response) throws Exception {
actorControlService.start(Actor.ADJACENCY_CALCULATION);
return "";
}
public Object createCrawlSpecFromDb(Request request, Response response) throws Exception {
actorControlService.startFrom(Actor.CRAWL_JOB_EXTRACTOR, CrawlJobExtractorActor.CREATE_FROM_DB,
new CrawlJobExtractorActor.CrawlJobExtractorArguments(
request.queryParamOrDefault("description", ""))
);
actorControlService.start(ExecutorActor.ADJACENCY_CALCULATION);
return "";
}
public Object createCrawlSpecFromDownload(Request request, Response response) throws Exception {
actorControlService.startFrom(Actor.CRAWL_JOB_EXTRACTOR, CrawlJobExtractorActor.CREATE_FROM_LINK,
actorControlService.startFrom(ExecutorActor.CRAWL_JOB_EXTRACTOR, CrawlJobExtractorActor.CREATE_FROM_LINK,
new CrawlJobExtractorActor.CrawlJobExtractorArgumentsWithURL(
request.queryParamOrDefault("description", ""),
request.queryParamOrDefault("url", ""))

View File

@ -1,32 +1,32 @@
package nu.marginalia.executor.svc;
import com.google.inject.Inject;
import nu.marginalia.actor.Actor;
import nu.marginalia.actor.ActorControlService;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.task.ConvertActor;
import spark.Request;
import spark.Response;
public class SideloadService {
private final ActorControlService actorControlService;
private final ExecutorActorControlService actorControlService;
@Inject
public SideloadService(ActorControlService actorControlService) {
public SideloadService(ExecutorActorControlService actorControlService) {
this.actorControlService = actorControlService;
}
public Object sideloadDirtree(Request request, Response response) throws Exception {
actorControlService.startFrom(Actor.CONVERT, ConvertActor.CONVERT_DIRTREE, request.queryParams("path"));
actorControlService.startFrom(ExecutorActor.CONVERT, ConvertActor.CONVERT_DIRTREE, request.queryParams("path"));
return "";
}
public Object sideloadEncyclopedia(Request request, Response response) throws Exception {
actorControlService.startFrom(Actor.CONVERT, ConvertActor.CONVERT_ENCYCLOPEDIA, request.queryParams("path"));
actorControlService.startFrom(ExecutorActor.CONVERT, ConvertActor.CONVERT_ENCYCLOPEDIA, request.queryParams("path"));
return "";
}
public Object sideloadStackexchange(Request request, Response response) throws Exception {
actorControlService.startFrom(Actor.CONVERT, ConvertActor.CONVERT_STACKEXCHANGE, request.queryParams("path"));
actorControlService.startFrom(ExecutorActor.CONVERT, ConvertActor.CONVERT_STACKEXCHANGE, request.queryParams("path"));
return "";
}
}

View File

@ -0,0 +1,172 @@
package nu.marginalia.executor.svc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows;
import nu.marginalia.executor.model.transfer.TransferItem;
import nu.marginalia.executor.model.transfer.TransferSpec;
import nu.marginalia.executor.storage.FileStorageContent;
import nu.marginalia.executor.storage.FileStorageFile;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageType;
import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Request;
import spark.Response;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
public class TransferService {
private final Gson gson;
private final FileStorageService fileStorageService;
private final HikariDataSource dataSource;
private final int nodeId;
private static final Logger logger = LoggerFactory.getLogger(TransferService.class);
@Inject
public TransferService(
Gson gson,
FileStorageService fileStorageService,
HikariDataSource dataSource,
ServiceConfiguration config)
{
this.gson = gson;
this.fileStorageService = fileStorageService;
this.dataSource = dataSource;
this.nodeId = config.node();
}
public Object transferFile(Request request, Response response) throws SQLException, IOException {
FileStorageId fileStorageId = FileStorageId.parse(request.params("fid"));
var fileStorage = fileStorageService.getStorage(fileStorageId);
Path basePath = fileStorage.asPath();
// This is not a public API so injection isn't a concern
Path filePath = basePath.resolve(request.queryParams("path"));
response.type("application/octet-stream");
FileUtils.copyFile(filePath.toFile(), response.raw().getOutputStream());
return "";
}
public FileStorageContent listFiles(Request request, Response response) throws SQLException, IOException {
FileStorageId fileStorageId = FileStorageId.parse(request.params("fid"));
var storage = fileStorageService.getStorage(fileStorageId);
List<FileStorageFile> files;
try (var fs = Files.list(storage.asPath())) {
files = fs.filter(Files::isRegularFile)
.map(this::createFileModel)
.sorted(Comparator.comparing(FileStorageFile::name))
.toList();
}
return new FileStorageContent(files);
}
@SneakyThrows
private FileStorageFile createFileModel(Path path) {
return new FileStorageFile(
path.toFile().getName(),
Files.size(path),
Files.getLastModifiedTime(path).toInstant().toString()
);
}
public TransferSpec getTransferSpec(Request request, Response response) throws SQLException {
List<FileStorageId> fileStorageIds = fileStorageService.getActiveFileStorages(nodeId, FileStorageType.CRAWL_DATA);
if (fileStorageIds.isEmpty()) {
logger.warn("No ACTIVE crawl data");
return new TransferSpec();
}
int count = Integer.parseInt(request.queryParams("count"));
logger.info("Preparing a transfer of {} domains", count);
List<TransferItem> items = new ArrayList<>();
var storage = fileStorageService.getStorage(fileStorageIds.get(0));
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("SELECT ID FROM EC_DOMAIN WHERE DOMAIN_NAME=? AND NODE_AFFINITY=" + nodeId)
) {
for (var item : WorkLog.iterable(storage.asPath().resolve("crawler.log"))) {
if (items.size() >= count)
break;
if (!Files.isRegularFile(storage.asPath().resolve(item.relPath()))) {
logger.info("Ignoring absent item {}", item);
continue;
}
query.setString(1, item.id());
var rs = query.executeQuery();
if (rs.next()) {
items.add(new TransferItem(
item.id(),
rs.getInt(1),
fileStorageIds.get(0),
item.relPath()
));
}
else {
logger.info("Rejected {}", item);
}
}
}
logger.info("Found {} eligible domains", items.size());
return new TransferSpec(items);
}
public Object yieldDomain(Request request, Response response) throws SQLException, IOException {
var item = gson.fromJson(request.body(), TransferItem.class);
var storage = fileStorageService.getStorage(item.fileStorageId());
Files.delete(storage.asPath().resolve(storage.asPath().resolve(item.path())));
return "";
}
public void pruneCrawlDataMqEndpoint() throws SQLException, IOException {
List<FileStorageId> fileStorageIds = fileStorageService.getActiveFileStorages(nodeId, FileStorageType.CRAWL_DATA);
if (fileStorageIds.isEmpty()) {
return;
}
var storage = fileStorageService.getStorage(fileStorageIds.get(0));
Path newCrawlLogPath = storage.asPath().resolve("crawler.log-new");
Path oldCrawlLogPath = storage.asPath().resolve("crawler.log");
int pruned = 0;
try (var newWorkLog = new WorkLog(newCrawlLogPath)) {
for (var item : WorkLog.iterable(oldCrawlLogPath)) {
if (Files.exists(storage.asPath().resolve(item.relPath()))) {
newWorkLog.setJobToFinished(item.id(), item.path(), item.cnt());
}
else {
pruned++;
}
}
}
if (pruned > 0) {
logger.info("Pruned {} items from the crawl log!", pruned);
}
Files.move(newCrawlLogPath, oldCrawlLogPath, StandardCopyOption.REPLACE_EXISTING);
}
}

View File

@ -4,8 +4,8 @@ import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Provides;
import nu.marginalia.actor.Actor;
import nu.marginalia.actor.ActorControlService;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.task.CrawlJobExtractorActor;
import nu.marginalia.client.Context;
import nu.marginalia.client.route.RouteProvider;
@ -14,7 +14,6 @@ import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.executor.model.crawl.RecrawlParameters;
import nu.marginalia.index.client.IndexClient;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.inbox.MqAsynchronousInbox;
@ -69,43 +68,43 @@ public class ExecutorSvcApiIntegrationTest {
@Test
public void startStartActor() throws Exception {
testInstances.client.startFsm(Context.internal(), 0, "crawl");
Mockito.verify(testInstances.actorControlService).startJSON(Actor.CRAWL, "\"\"");
Mockito.verify(testInstances.actorControlService).startJSON(ExecutorActor.CRAWL, "\"\"");
}
@Test
public void startStopActor() {
testInstances.client.stopFsm(Context.internal(), 0, "crawl");
Mockito.verify(testInstances.actorControlService).stop(Actor.CRAWL);
Mockito.verify(testInstances.actorControlService).stop(ExecutorActor.CRAWL);
}
@Test
public void triggerCrawl() throws Exception {
testInstances.client.triggerCrawl(Context.internal(), 0, "1");
Mockito.verify(testInstances.actorControlService).start(eq(Actor.CRAWL), any());
Mockito.verify(testInstances.actorControlService).start(eq(ExecutorActor.CRAWL), any());
}
@Test
public void triggerRecrawl() throws Exception {
testInstances.client.triggerRecrawl(Context.internal(), 0,
new RecrawlParameters(new FileStorageId(0), List.of()));
new FileStorageId(0));
Mockito.verify(testInstances.actorControlService).start(eq(Actor.RECRAWL), any());
Mockito.verify(testInstances.actorControlService).start(eq(ExecutorActor.RECRAWL), any());
}
@Test
public void triggerConvert() throws Exception {
testInstances.client.triggerConvert(Context.internal(), 0, "1");
Mockito.verify(testInstances.actorControlService).startFrom(eq(Actor.CONVERT), eq("CONVERT"), any());
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT), eq("CONVERT"), any());
}
@Test
public void triggerProcessAndLoad() throws Exception {
testInstances.client.triggerProcessAndLoad(Context.internal(), 0, "1");
Mockito.verify(testInstances.actorControlService).start(eq(Actor.CONVERT_AND_LOAD), any());
Mockito.verify(testInstances.actorControlService).start(eq(ExecutorActor.CONVERT_AND_LOAD), any());
}
@Test
@ -113,7 +112,7 @@ public class ExecutorSvcApiIntegrationTest {
testInstances.client.loadProcessedData(Context.internal(), 0, "1");
Mockito.verify(testInstances.actorControlService).startFrom(
eq(Actor.CONVERT_AND_LOAD),
eq(ExecutorActor.CONVERT_AND_LOAD),
eq("LOAD"),
any());
}
@ -122,45 +121,39 @@ public class ExecutorSvcApiIntegrationTest {
public void calculateAdjacencies() throws Exception {
testInstances.client.calculateAdjacencies(Context.internal(), 0);
Mockito.verify(testInstances.actorControlService).start(eq(Actor.ADJACENCY_CALCULATION));
Mockito.verify(testInstances.actorControlService).start(eq(ExecutorActor.ADJACENCY_CALCULATION));
}
@Test
public void sideloadDirtree() throws Exception {
testInstances.client.sideloadDirtree(Context.internal(), 0, Path.of("/tmp/test"));
Mockito.verify(testInstances.actorControlService).startFrom(eq(Actor.CONVERT), eq("CONVERT_DIRTREE"), eq("/tmp/test"));
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT), eq("CONVERT_DIRTREE"), eq("/tmp/test"));
}
@Test
public void sideloadEncyclopedia() throws Exception {
testInstances.client.sideloadEncyclopedia(Context.internal(), 0, Path.of("/tmp/test"));
Mockito.verify(testInstances.actorControlService).startFrom(eq(Actor.CONVERT), eq("CONVERT_ENCYCLOPEDIA"), eq("/tmp/test"));
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT), eq("CONVERT_ENCYCLOPEDIA"), eq("/tmp/test"));
}
@Test
public void sideloadStackexchange() throws Exception {
testInstances.client.sideloadStackexchange(Context.internal(), 0, Path.of("/tmp/test"));
Mockito.verify(testInstances.actorControlService).startFrom(eq(Actor.CONVERT), eq("CONVERT_STACKEXCHANGE"), eq("/tmp/test"));
}
@Test
public void testCreateCrawlSpecFromDb() throws Exception {
testInstances.client.createCrawlSpecFromDb(Context.internal(), 0, "Lorem Ipsum");
Mockito.verify(testInstances.actorControlService).startFrom(eq(Actor.CRAWL_JOB_EXTRACTOR), eq("CREATE_FROM_DB"), eq(new CrawlJobExtractorActor.CrawlJobExtractorArguments("Lorem Ipsum")));
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT), eq("CONVERT_STACKEXCHANGE"), eq("/tmp/test"));
}
@Test
public void testCreateCrawlSpecFromUrl() throws Exception {
testInstances.client.createCrawlSpecFromDownload(Context.internal(), 0, "Lorem Ipsum", "http://www.example.com");
Mockito.verify(testInstances.actorControlService).startFrom(eq(Actor.CRAWL_JOB_EXTRACTOR), eq("CREATE_FROM_LINK"), eq(new CrawlJobExtractorActor.CrawlJobExtractorArgumentsWithURL("Lorem Ipsum", "http://www.example.com")));
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CRAWL_JOB_EXTRACTOR), eq("CREATE_FROM_LINK"), eq(new CrawlJobExtractorActor.CrawlJobExtractorArgumentsWithURL("Lorem Ipsum", "http://www.example.com")));
}
@Test
public void backupRestore() throws Exception {
testInstances.client.restoreBackup(Context.internal(), 0, "1");
Mockito.verify(testInstances.actorControlService).startFrom(eq(Actor.RESTORE_BACKUP), eq("RESTORE"), eq(new FileStorageId(1)));
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.RESTORE_BACKUP), eq("RESTORE"), eq(new FileStorageId(1)));
}
}
@ -170,14 +163,14 @@ class TestInstances {
@Inject
ExecutorClient client;
@Inject
ActorControlService actorControlService;
ExecutorActorControlService actorControlService;
}
class TestModule extends AbstractModule {
@Override
public void configure() {
System.setProperty("service-name", "test");
bind(ActorControlService.class).toInstance(Mockito.mock(ActorControlService.class));
bind(ExecutorActorControlService.class).toInstance(Mockito.mock(ExecutorActorControlService.class));
bind(FileStorageService.class).toInstance(Mockito.mock(FileStorageService.class));
bind(ProcessService.class).toInstance(Mockito.mock(ProcessService.class));
bind(ServiceEventLog.class).toInstance(Mockito.mock(ServiceEventLog.class));

View File

@ -66,7 +66,7 @@ data from https://downloads.marginalia.nu/ and extract it to the correct locatio
The system will pick the data up automatically.
```shell
$ run/download-samples l
$ run/download-samples.sh l
```
Four sets are available: