diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java index a66186ca..8981132a 100644 --- a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java +++ b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java @@ -4,18 +4,21 @@ import com.google.inject.Inject; import nu.marginalia.client.AbstractDynamicClient; import nu.marginalia.client.Context; import nu.marginalia.executor.model.ActorRunStates; -import nu.marginalia.executor.model.crawl.RecrawlParameters; import nu.marginalia.executor.model.load.LoadParameters; +import nu.marginalia.executor.model.transfer.TransferItem; +import nu.marginalia.executor.model.transfer.TransferSpec; import nu.marginalia.executor.storage.FileStorageContent; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.service.descriptor.ServiceDescriptors; import nu.marginalia.service.id.ServiceId; import nu.marginalia.storage.model.FileStorageId; +import java.io.OutputStream; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.nio.file.Path; import java.util.List; +import java.util.concurrent.TimeUnit; public class ExecutorClient extends AbstractDynamicClient { @Inject @@ -35,8 +38,8 @@ public class ExecutorClient extends AbstractDynamicClient { post(ctx, node, "/process/crawl/" + fid, "").blockingSubscribe(); } - public void triggerRecrawl(Context ctx, int node, RecrawlParameters parameters) { - post(ctx, node, "/process/recrawl", parameters).blockingSubscribe(); + public void triggerRecrawl(Context ctx, int node, FileStorageId fid) { + post(ctx, node, "/process/recrawl", fid).blockingSubscribe(); } public void triggerConvert(Context ctx, int node, FileStorageId fid) { @@ -88,11 +91,6 @@ public class ExecutorClient extends AbstractDynamicClient { "").blockingSubscribe(); } - public void createCrawlSpecFromDb(Context context, int node, String description) { - post(context, node, "/process/crawl-spec/from-db?description="+URLEncoder.encode(description, StandardCharsets.UTF_8), "") - .blockingSubscribe(); - } - public void createCrawlSpecFromDownload(Context context, int node, String description, String url) { post(context, node, "/process/crawl-spec/from-download?description="+URLEncoder.encode(description, StandardCharsets.UTF_8)+"&url="+URLEncoder.encode(url, StandardCharsets.UTF_8), "") .blockingSubscribe(); @@ -110,4 +108,21 @@ public class ExecutorClient extends AbstractDynamicClient { return get(context, node, "/storage/"+fileId.id(), FileStorageContent.class).blockingFirst(); } + public void transferFile(Context context, int node, FileStorageId fileId, String path, OutputStream destOutputStream) { + String endpoint = "/transfer/file/%d?path=%s".formatted(fileId.id(), URLEncoder.encode(path, StandardCharsets.UTF_8)); + + get(context, node, endpoint, + destOutputStream) + .blockingSubscribe(); + } + + public TransferSpec getTransferSpec(Context context, int node, int count) { + return get(context, node, "/transfer/spec?count="+count, TransferSpec.class) + .timeout(30, TimeUnit.MINUTES) + .blockingFirst(); + } + + public void yieldDomain(Context context, int node, TransferItem item) { + post(context, node, "/transfer/yield", item).blockingSubscribe(); + } } diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/model/crawl/RecrawlParameters.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/model/crawl/RecrawlParameters.java deleted file mode 100644 index e4fbefc3..00000000 --- a/code/api/executor-api/src/main/java/nu/marginalia/executor/model/crawl/RecrawlParameters.java +++ /dev/null @@ -1,11 +0,0 @@ -package nu.marginalia.executor.model.crawl; - -import nu.marginalia.storage.model.FileStorageId; - -import java.util.List; - -public record RecrawlParameters( - FileStorageId crawlDataId, - List crawlSpecIds -) { -} diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/model/transfer/TransferItem.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/model/transfer/TransferItem.java new file mode 100644 index 00000000..17be3dd8 --- /dev/null +++ b/code/api/executor-api/src/main/java/nu/marginalia/executor/model/transfer/TransferItem.java @@ -0,0 +1,9 @@ +package nu.marginalia.executor.model.transfer; + +import nu.marginalia.storage.model.FileStorageId; + +public record TransferItem(String domainName, + int domainId, + FileStorageId fileStorageId, + String path) { +} diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/model/transfer/TransferSpec.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/model/transfer/TransferSpec.java new file mode 100644 index 00000000..8048a411 --- /dev/null +++ b/code/api/executor-api/src/main/java/nu/marginalia/executor/model/transfer/TransferSpec.java @@ -0,0 +1,13 @@ +package nu.marginalia.executor.model.transfer; + +import java.util.List; + +public record TransferSpec(List items) { + public TransferSpec() { + this(List.of()); + } + + public int size() { + return items.size(); + } +} diff --git a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/crawling/CrawlRequest.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/crawling/CrawlRequest.java index f8376d53..40cd30ce 100644 --- a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/crawling/CrawlRequest.java +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/crawling/CrawlRequest.java @@ -8,6 +8,14 @@ import java.util.List; /** A request to start a crawl */ @AllArgsConstructor public class CrawlRequest { + /** (optional) Crawl spec(s) for sourcing domains to crawl. If not set, + * the EC_DOMAIN table will be consulted and domains with the corresponding + * node affinity will be used. + */ public List specStorage; + + /** File storage where the crawl data will be written. If it contains existing crawl data, + * this crawl data will be referenced for e-tags and last-mofified checks. + */ public FileStorageId crawlStorage; } diff --git a/code/common/config/src/main/java/nu/marginalia/storage/FileStorageService.java b/code/common/config/src/main/java/nu/marginalia/storage/FileStorageService.java index 956682ce..7fbd646c 100644 --- a/code/common/config/src/main/java/nu/marginalia/storage/FileStorageService.java +++ b/code/common/config/src/main/java/nu/marginalia/storage/FileStorageService.java @@ -488,6 +488,10 @@ public class FileStorageService { } } + public List getActiveFileStorages(FileStorageType type) throws SQLException { + return getActiveFileStorages(node, type); + } + public List getActiveFileStorages(int nodeId, FileStorageType type) throws SQLException { diff --git a/code/common/db/src/main/java/nu/marginalia/db/DbDomainStatsExportMultitool.java b/code/common/db/src/main/java/nu/marginalia/db/DbDomainStatsExportMultitool.java index 0288179a..6fa07c7e 100644 --- a/code/common/db/src/main/java/nu/marginalia/db/DbDomainStatsExportMultitool.java +++ b/code/common/db/src/main/java/nu/marginalia/db/DbDomainStatsExportMultitool.java @@ -14,6 +14,7 @@ import java.util.OptionalInt; */ public class DbDomainStatsExportMultitool implements AutoCloseable { private final Connection connection; + private final int nodeId; private final PreparedStatement knownUrlsQuery; private final PreparedStatement visitedUrlsQuery; private final PreparedStatement goodUrlsQuery; @@ -23,8 +24,9 @@ public class DbDomainStatsExportMultitool implements AutoCloseable { private final PreparedStatement crawlQueueDomains; private final PreparedStatement indexedDomainsQuery; - public DbDomainStatsExportMultitool(HikariDataSource dataSource) throws SQLException { + public DbDomainStatsExportMultitool(HikariDataSource dataSource, int nodeId) throws SQLException { this.connection = dataSource.getConnection(); + this.nodeId = nodeId; knownUrlsQuery = connection.prepareStatement(""" SELECT KNOWN_URLS @@ -64,21 +66,14 @@ public class DbDomainStatsExportMultitool implements AutoCloseable { """); } - public OptionalInt getKnownUrls(String domainName) throws SQLException { - return executeNameToIntQuery(domainName, knownUrlsQuery); - } public OptionalInt getVisitedUrls(String domainName) throws SQLException { return executeNameToIntQuery(domainName, visitedUrlsQuery); } - public OptionalInt getGoodUrls(String domainName) throws SQLException { - return executeNameToIntQuery(domainName, goodUrlsQuery); - } + public OptionalInt getDomainId(String domainName) throws SQLException { return executeNameToIntQuery(domainName, domainNameToId); } - public List getAllDomains() throws SQLException { - return executeListQuery(allDomainsQuery, 100_000); - } + public List getCrawlQueueDomains() throws SQLException { return executeListQuery(crawlQueueDomains, 100); } diff --git a/code/common/db/src/main/resources/db/migration/V23_11_0_007__domain_node_affinity.sql b/code/common/db/src/main/resources/db/migration/V23_11_0_007__domain_node_affinity.sql new file mode 100644 index 00000000..6774ec2a --- /dev/null +++ b/code/common/db/src/main/resources/db/migration/V23_11_0_007__domain_node_affinity.sql @@ -0,0 +1 @@ +ALTER TABLE EC_DOMAIN ADD COLUMN NODE_AFFINITY INT NOT NULL; \ No newline at end of file diff --git a/code/common/process/src/main/java/nu/marginalia/process/log/WorkLogEntry.java b/code/common/process/src/main/java/nu/marginalia/process/log/WorkLogEntry.java index d0cf0ef8..306389f6 100644 --- a/code/common/process/src/main/java/nu/marginalia/process/log/WorkLogEntry.java +++ b/code/common/process/src/main/java/nu/marginalia/process/log/WorkLogEntry.java @@ -20,4 +20,18 @@ public record WorkLogEntry(String id, String ts, String path, int cnt) { return splitPattern.split(s, 2)[0]; } + public String fileName() { + if (path.contains("/")) { + return path.substring(path.lastIndexOf("/") + 1); + } + return path; + } + + public String relPath() { + // Compatibility trick! + + String relPath = fileName(); + + return relPath.substring(0, 2) + "/" + relPath.substring(2, 4) + "/" + relPath; + } } diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java b/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java index da4f0fe4..92e1ff0c 100644 --- a/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java +++ b/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java @@ -17,7 +17,9 @@ import okhttp3.*; import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import spark.utils.IOUtils; +import java.io.OutputStream; import java.net.ConnectException; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -231,6 +233,23 @@ public abstract class AbstractClient implements AutoCloseable { .doFinally(() -> ThreadContext.remove("outbound-request")); } + protected synchronized Observable get(Context ctx, int node, String endpoint, OutputStream outputStream) { + ensureAlive(node); + + var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).get().build(); + + return Observable.just(client.newCall(req)) + .subscribeOn(scheduler().get()) + .map(this::logInbound) + .map(Call::execute) + .map(this::logOutbound) + .map(rsp -> validateResponseStatus(rsp, req, 200)) + .map(rsp -> copyToOutputStream(rsp, outputStream)) + .retryWhen(this::retryHandler) + .timeout(timeout, TimeUnit.SECONDS) + .doFinally(() -> ThreadContext.remove("outbound-request")); + } + @SuppressWarnings("unchecked") protected synchronized Observable get(Context ctx, int node, String endpoint) { ensureAlive(node); @@ -352,6 +371,13 @@ public abstract class AbstractClient implements AutoCloseable { } + @SneakyThrows + private Integer copyToOutputStream(Response response, OutputStream outputStream) { + try (response) { + return IOUtils.copy(response.body().byteStream(), outputStream); + } + } + @SneakyThrows private T getEntity(Response response, Class clazz) { try (response) { diff --git a/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/SimpleBlockingThreadPool.java b/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/SimpleBlockingThreadPool.java index 5bd4baf6..294bf88e 100644 --- a/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/SimpleBlockingThreadPool.java +++ b/code/libraries/blocking-thread-pool/src/main/java/nu/marginalia/util/SimpleBlockingThreadPool.java @@ -56,7 +56,7 @@ public class SimpleBlockingThreadPool { } private void worker() { - while (!tasks.isEmpty() || !shutDown) { + while (!(tasks.isEmpty() && shutDown)) { try { Task task = tasks.poll(1, TimeUnit.SECONDS); if (task == null) { @@ -90,13 +90,6 @@ public class SimpleBlockingThreadPool { final long start = System.currentTimeMillis(); final long deadline = start + timeUnit.toMillis(i); - // Drain the queue - while (!tasks.isEmpty()) { - long timeRemaining = deadline - System.currentTimeMillis(); - if (timeRemaining <= 0) - return false; - } - // Wait for termination for (var thread : workers) { if (!thread.isAlive()) diff --git a/code/process-models/crawl-spec/src/main/java/nu/marginalia/crawlspec/CrawlSpecGenerator.java b/code/process-models/crawl-spec/src/main/java/nu/marginalia/crawlspec/CrawlSpecGenerator.java index 17c4012b..a8140232 100644 --- a/code/process-models/crawl-spec/src/main/java/nu/marginalia/crawlspec/CrawlSpecGenerator.java +++ b/code/process-models/crawl-spec/src/main/java/nu/marginalia/crawlspec/CrawlSpecGenerator.java @@ -2,7 +2,6 @@ package nu.marginalia.crawlspec; import nu.marginalia.db.DbDomainStatsExportMultitool; import nu.marginalia.io.crawlspec.CrawlSpecRecordParquetFileWriter; -import nu.marginalia.linkdb.LinkdbReader; import nu.marginalia.model.crawlspec.CrawlSpecRecord; import java.io.IOException; @@ -126,18 +125,5 @@ public class CrawlSpecGenerator { "https://" + domainName + "/" ); } - - static KnownUrlsListSource fromLinkdb(DbDomainStatsExportMultitool dbData, - LinkdbReader linkdbReader) - { - return domainName -> { - var maybeId = dbData.getDomainId(domainName); - if (maybeId.isEmpty()) - return List.of(); - - return linkdbReader - .getUrlsFromDomain(maybeId.getAsInt()); - }; - } } } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index 6b9201af..80e64e01 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -123,6 +123,7 @@ public class ConverterMain { int totalDomains = plan.countCrawledDomains(); AtomicInteger processedDomains = new AtomicInteger(0); + logger.info("Processing {} domains", totalDomains); // Advance the progress bar to the current position if this is a resumption processedDomains.set(batchingWorkLog.size()); @@ -138,6 +139,9 @@ public class ConverterMain { }); } + // Grace period in case we're loading like 1 item + Thread.sleep(100); + pool.shutDown(); do { System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining"); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java index cd6f5bd1..5b9cb175 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java @@ -10,10 +10,12 @@ import nu.marginalia.UserAgent; import nu.marginalia.WmsaHome; import nu.marginalia.crawl.retreival.CrawlDataReference; import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl; +import nu.marginalia.crawl.spec.CrawlSpecProvider; +import nu.marginalia.crawl.spec.DbCrawlSpecProvider; +import nu.marginalia.crawl.spec.ParquetCrawlSpecProvider; import nu.marginalia.crawling.io.CrawledDomainReader; import nu.marginalia.crawlspec.CrawlSpecFileNames; import nu.marginalia.storage.FileStorageService; -import nu.marginalia.io.crawlspec.CrawlSpecRecordParquetFileReader; import nu.marginalia.model.crawlspec.CrawlSpecRecord; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.MqMessage; @@ -53,6 +55,7 @@ public class CrawlerMain { private final UserAgent userAgent; private final MessageQueueFactory messageQueueFactory; private final FileStorageService fileStorageService; + private final DbCrawlSpecProvider dbCrawlSpecProvider; private final Gson gson; private final int node; private final SimpleBlockingThreadPool pool; @@ -72,11 +75,13 @@ public class CrawlerMain { MessageQueueFactory messageQueueFactory, FileStorageService fileStorageService, ProcessConfiguration processConfiguration, + DbCrawlSpecProvider dbCrawlSpecProvider, Gson gson) { this.heartbeat = heartbeat; this.userAgent = userAgent; this.messageQueueFactory = messageQueueFactory; this.fileStorageService = fileStorageService; + this.dbCrawlSpecProvider = dbCrawlSpecProvider; this.gson = gson; this.node = processConfiguration.node(); @@ -109,7 +114,7 @@ public class CrawlerMain { var instructions = crawler.fetchInstructions(); try { - crawler.run(instructions.crawlSpec, instructions.outputDir); + crawler.run(instructions.specProvider, instructions.outputDir); instructions.ok(); } catch (Exception ex) { @@ -123,30 +128,24 @@ public class CrawlerMain { System.exit(0); } - public void run(List crawlSpec, Path outputDir) throws InterruptedException, IOException { + public void run(CrawlSpecProvider specProvider, Path outputDir) throws InterruptedException, IOException { heartbeat.start(); try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"))) { // First a validation run to ensure the file is all good to parse logger.info("Validating JSON"); - int taskCount = 0; - for (var specs : crawlSpec) { - taskCount += CrawlSpecRecordParquetFileReader.count(specs); - } - totalTasks = taskCount; + totalTasks = specProvider.totalCount(); - logger.info("Queued {} crawl tasks, let's go", taskCount); + logger.info("Queued {} crawl tasks, let's go", totalTasks); - for (var specs : crawlSpec) { - try (var specStream = CrawlSpecRecordParquetFileReader.stream(specs)) { - specStream - .takeWhile((e) -> abortMonitor.isAlive()) - .filter(e -> !workLog.isJobFinished(e.domain)) - .filter(e -> processingIds.put(e.domain, "") == null) - .map(e -> new CrawlTask(e, outputDir, workLog)) - .forEach(pool::submitQuietly); - } + try (var specStream = specProvider.stream()) { + specStream + .takeWhile((e) -> abortMonitor.isAlive()) + .filter(e -> !workLog.isJobFinished(e.domain)) + .filter(e -> processingIds.put(e.domain, "") == null) + .map(e -> new CrawlTask(e, outputDir, workLog)) + .forEach(pool::submitQuietly); } logger.info("Shutting down the pool, waiting for tasks to complete..."); @@ -155,6 +154,9 @@ public class CrawlerMain { do { System.out.println("Waiting for pool to terminate... " + pool.getActiveCount() + " remaining"); } while (!pool.awaitTermination(60, TimeUnit.SECONDS)); + } + catch (Exception ex) { + } finally { heartbeat.shutDown(); @@ -227,15 +229,15 @@ public class CrawlerMain { private static class CrawlRequest { - private final List crawlSpec; + private final CrawlSpecProvider specProvider; private final Path outputDir; private final MqMessage message; private final MqSingleShotInbox inbox; - CrawlRequest(List crawlSpec, Path outputDir, MqMessage message, MqSingleShotInbox inbox) { + CrawlRequest(CrawlSpecProvider specProvider, Path outputDir, MqMessage message, MqSingleShotInbox inbox) { this.message = message; this.inbox = inbox; - this.crawlSpec = crawlSpec; + this.specProvider = specProvider; this.outputDir = outputDir; } @@ -259,11 +261,20 @@ public class CrawlerMain { var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.crawling.CrawlRequest.class); - var specData = fileStorageService.getStorage(request.specStorage); + CrawlSpecProvider specProvider; + + if (request.specStorage != null) { + var specData = fileStorageService.getStorage(request.specStorage); + specProvider = new ParquetCrawlSpecProvider(CrawlSpecFileNames.resolve(specData)); + } + else { + specProvider = dbCrawlSpecProvider; + } + var crawlData = fileStorageService.getStorage(request.crawlStorage); return new CrawlRequest( - CrawlSpecFileNames.resolve(specData), + specProvider, crawlData.asPath(), msg, inbox); diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/CrawlSpecProvider.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/CrawlSpecProvider.java new file mode 100644 index 00000000..22177d6f --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/CrawlSpecProvider.java @@ -0,0 +1,10 @@ +package nu.marginalia.crawl.spec; + +import nu.marginalia.model.crawlspec.CrawlSpecRecord; + +import java.util.stream.Stream; + +public interface CrawlSpecProvider { + int totalCount() throws Exception; + Stream stream(); +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/DbCrawlSpecProvider.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/DbCrawlSpecProvider.java new file mode 100644 index 00000000..908445c3 --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/DbCrawlSpecProvider.java @@ -0,0 +1,76 @@ +package nu.marginalia.crawl.spec; + +import com.google.inject.Inject; +import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.ProcessConfiguration; +import nu.marginalia.model.crawlspec.CrawlSpecRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Stream; + +public class DbCrawlSpecProvider implements CrawlSpecProvider { + private final HikariDataSource dataSource; + private final ProcessConfiguration processConfiguration; + private List domains; + + private static final Logger logger = LoggerFactory.getLogger(DbCrawlSpecProvider.class); + + @Inject + public DbCrawlSpecProvider(HikariDataSource dataSource, + ProcessConfiguration processConfiguration + ) { + this.dataSource = dataSource; + this.processConfiguration = processConfiguration; + } + + // Load the domains into memory to ensure the crawler is resilient to database blips + private List loadData() throws SQLException { + var domains = new ArrayList(); + + logger.info("Loading domains to be crawled"); + + try (var conn = dataSource.getConnection(); + var query = conn.prepareStatement(""" + SELECT DOMAIN_NAME, COALESCE(GOOD_URLS, 0) + FROM EC_DOMAIN + LEFT JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID + WHERE NODE_AFFINITY=? + """)) + { + query.setInt(1, processConfiguration.node()); + query.setFetchSize(10_000); + var rs = query.executeQuery(); + while (rs.next()) { + domains.add(new CrawlSpecRecord( + rs.getString(1), + Math.clamp((int) (1.25 * rs.getInt(2)), 200, 10_000), + List.of() + )); + } + } + + logger.info("Loaded {} domains", domains.size()); + + return domains; + } + + + @Override + public int totalCount() throws SQLException { + if (domains == null) { + domains = loadData(); + } + return domains.size(); + } + + @Override + public Stream stream() { + return domains.stream(); + } + + +} diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/ParquetCrawlSpecProvider.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/ParquetCrawlSpecProvider.java new file mode 100644 index 00000000..db272623 --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/ParquetCrawlSpecProvider.java @@ -0,0 +1,37 @@ +package nu.marginalia.crawl.spec; + +import lombok.SneakyThrows; +import nu.marginalia.io.crawlspec.CrawlSpecRecordParquetFileReader; +import nu.marginalia.model.crawlspec.CrawlSpecRecord; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.List; +import java.util.stream.Stream; + +public class ParquetCrawlSpecProvider implements CrawlSpecProvider { + private final List files; + + public ParquetCrawlSpecProvider(List files) { + this.files = files; + } + + @Override + public int totalCount() throws IOException { + int total = 0; + for (var specs : files) { + total += CrawlSpecRecordParquetFileReader.count(specs); + } + return total; + } + + @Override + public Stream stream() { + return files.stream().flatMap(this::streamQuietly); + } + + @SneakyThrows + private Stream streamQuietly(Path file) { + return CrawlSpecRecordParquetFileReader.stream(file); + } +} diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/domains/DomainLoaderService.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/domains/DomainLoaderService.java index 94819190..6d065814 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/domains/DomainLoaderService.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/domains/DomainLoaderService.java @@ -3,16 +3,17 @@ package nu.marginalia.loading.domains; import com.google.inject.Inject; import com.google.inject.Singleton; import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.ProcessConfiguration; import nu.marginalia.io.processed.DomainLinkRecordParquetFileReader; import nu.marginalia.io.processed.DomainRecordParquetFileReader; -import nu.marginalia.io.processed.ProcessedDataFileNames; import nu.marginalia.loading.LoaderInputData; import nu.marginalia.model.EdgeDomain; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.nio.file.Path; +import java.sql.Connection; +import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.*; @@ -21,10 +22,14 @@ public class DomainLoaderService { private final HikariDataSource dataSource; private final Logger logger = LoggerFactory.getLogger(DomainLoaderService.class); + private final int nodeId; @Inject - public DomainLoaderService(HikariDataSource dataSource) { + public DomainLoaderService(HikariDataSource dataSource, + ProcessConfiguration processConfiguration + ) { this.dataSource = dataSource; + this.nodeId = processConfiguration.node(); } /** Read the domain names from each parquet file @@ -34,51 +39,32 @@ public class DomainLoaderService { public DomainIdRegistry getOrCreateDomainIds(LoaderInputData inputData) throws IOException, SQLException { - Collection domainNamesAll = readDomainNames(inputData); - return getDatabaseIds(domainNamesAll); - } - - Collection readDomainNames(LoaderInputData inputData) throws IOException { - final Set domainNamesAll = new HashSet<>(100_000); - - var domainFiles = inputData.listDomainFiles(); - for (var file : domainFiles) { - domainNamesAll.addAll(DomainRecordParquetFileReader.getDomainNames(file)); - } - - var linkFiles = inputData.listDomainLinkFiles(); - for (var file : linkFiles) { - domainNamesAll.addAll(DomainLinkRecordParquetFileReader.getDestDomainNames(file)); - } - - return domainNamesAll; - } - - DomainIdRegistry getDatabaseIds(Collection domainNamesAll) throws SQLException { + Set domainNamesAll = new HashSet<>(); DomainIdRegistry ret = new DomainIdRegistry(); try (var conn = dataSource.getConnection(); - var insertStmt = conn.prepareStatement(""" - INSERT IGNORE INTO EC_DOMAIN (DOMAIN_NAME, DOMAIN_TOP) VALUES (?, ?) - """); var selectStmt = conn.prepareStatement(""" SELECT ID, DOMAIN_NAME FROM EC_DOMAIN WHERE DOMAIN_NAME=? """) ) { - int i = 0; - for (var domain : domainNamesAll) { - var parsed = new EdgeDomain(domain); - insertStmt.setString(1, domain); - insertStmt.setString(2, parsed.domain); - insertStmt.addBatch(); - if (++i > 1000) { - i = 0; - insertStmt.executeBatch(); + try (var inserter = new DomainInserter(conn, nodeId)) { + for (var domain : readSetDomainNames(inputData)) { + inserter.accept(new EdgeDomain(domain)); + domainNamesAll.add(domain); } } - if (i > 0) { - insertStmt.executeBatch(); + try (var inserter = new DomainInserter(conn, -1)) { + for (var domain : readReferencedDomainNames(inputData)) { + inserter.accept(new EdgeDomain(domain)); + domainNamesAll.add(domain); + } + } + + try (var updater = new DomainAffinityUpdater(conn, nodeId)) { + for (var domain : readSetDomainNames(inputData)) { + updater.accept(new EdgeDomain(domain)); + } } for (var domain : domainNamesAll) { @@ -95,4 +81,87 @@ public class DomainLoaderService { return ret; } + + Collection readSetDomainNames(LoaderInputData inputData) throws IOException { + final Set domainNamesAll = new HashSet<>(100_000); + + var domainFiles = inputData.listDomainFiles(); + for (var file : domainFiles) { + domainNamesAll.addAll(DomainRecordParquetFileReader.getDomainNames(file)); + } + + return domainNamesAll; + } + + Collection readReferencedDomainNames(LoaderInputData inputData) throws IOException { + final Set domainNamesAll = new HashSet<>(100_000); + + var linkFiles = inputData.listDomainLinkFiles(); + for (var file : linkFiles) { + domainNamesAll.addAll(DomainLinkRecordParquetFileReader.getDestDomainNames(file)); + } + + return domainNamesAll; + } + + private class DomainInserter implements AutoCloseable { + private final PreparedStatement statement; + private final int nodeAffinity; + + + private int count = 0; + + public DomainInserter(Connection connection, int affinity) throws SQLException { + nodeAffinity = affinity; + statement = connection.prepareStatement("INSERT IGNORE INTO EC_DOMAIN (DOMAIN_NAME, DOMAIN_TOP, NODE_AFFINITY) VALUES (?, ?, ?)"); + } + + public void accept(EdgeDomain domain) throws SQLException { + statement.setString(1, domain.toString()); + statement.setString(2, domain.domain); + statement.setInt(3, nodeAffinity); + statement.addBatch(); + + if (++count > 1000) { + statement.executeBatch(); + } + } + + @Override + public void close() throws SQLException { + if (count > 0) { + statement.executeBatch(); + } + statement.close(); + } + } + private static class DomainAffinityUpdater implements AutoCloseable { + private final PreparedStatement statement; + private final int nodeAffinity; + + private int count = 0; + + public DomainAffinityUpdater(Connection connection, int affinity) throws SQLException { + this.nodeAffinity = affinity; + statement = connection.prepareStatement("UPDATE EC_DOMAIN SET NODE_AFFINITY = ? WHERE DOMAIN_NAME=?"); + } + + public void accept(EdgeDomain domain) throws SQLException { + statement.setInt(1, nodeAffinity); + statement.setString(2, domain.toString()); + statement.addBatch(); + + if (++count > 1000) { + statement.executeBatch(); + } + } + + @Override + public void close() throws SQLException { + if (count > 0) { + statement.executeBatch(); + } + statement.close(); + } + } } diff --git a/code/processes/loading-process/src/test/java/nu/marginalia/loading/domains/DomainLoaderServiceTest.java b/code/processes/loading-process/src/test/java/nu/marginalia/loading/domains/DomainLoaderServiceTest.java index 9c51415c..e199580f 100644 --- a/code/processes/loading-process/src/test/java/nu/marginalia/loading/domains/DomainLoaderServiceTest.java +++ b/code/processes/loading-process/src/test/java/nu/marginalia/loading/domains/DomainLoaderServiceTest.java @@ -1,6 +1,8 @@ package nu.marginalia.loading.domains; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import nu.marginalia.ProcessConfiguration; import nu.marginalia.io.processed.DomainLinkRecordParquetFileWriter; import nu.marginalia.io.processed.DomainRecordParquetFileWriter; import nu.marginalia.io.processed.ProcessedDataFileNames; @@ -57,6 +59,7 @@ class DomainLoaderServiceTest { toDelete.clear(); } + @Test void readDomainNames() throws IOException { Path workDir = Files.createTempDirectory(getClass().getSimpleName()); @@ -92,43 +95,14 @@ class DomainLoaderServiceTest { } } // Read them - var domainService = new DomainLoaderService(null); - var domainNames = domainService.readDomainNames(new LoaderInputData(workDir, 2)); + var domainService = new DomainLoaderService(null, new ProcessConfiguration("test", 1, UUID.randomUUID())); // Verify - Set expectedDomains = Stream.of(domains1, domains2, linkDomains) - .flatMap(List::stream) - .collect(Collectors.toSet()); - assertEquals(expectedDomains, domainNames); - } + Set expectedDomains1 = Sets.union(new HashSet<>(domains1), new HashSet<>(domains2)); + assertEquals(expectedDomains1, domainService.readSetDomainNames(new LoaderInputData(workDir, 2))); - @Test - void getDatabaseIds() { - try (var dataSource = DbTestUtil.getConnection(mariaDBContainer.getJdbcUrl())) { - var domainService = new DomainLoaderService(dataSource); - - for (int i = 0; i < 2; i++) { - // run the test case twice to cover both the insert and query cases - System.out.println("Case " + i); - - var domains = List.of("memex.marginalia.nu", "www.marginalia.nu", "search.marginalia.nu", "wiby.me"); - var data = domainService.getDatabaseIds(domains); - - Map ids = new HashMap<>(); - - for (String domain : domains) { - ids.put(domain, data.getDomainId(domain)); - } - - // Verify we got 4 domain IDs for the provided inputs - var entries = new HashSet<>(ids.values()); - assertEquals(4, entries.size()); - assertEquals(Set.of(1,2,3,4), entries); // this may be fragile? - } - - } catch (SQLException e) { - Assertions.fail(e); - } + Set expectedDomains2 = new HashSet<>(linkDomains); + assertEquals(expectedDomains2, domainService.readReferencedDomainNames(new LoaderInputData(workDir, 2))); } private DomainRecord dr(String domainName) { diff --git a/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java b/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java index dc12d2c5..3b27f2fb 100644 --- a/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java +++ b/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java @@ -1,6 +1,9 @@ package nu.marginalia.loading.links; import com.google.common.collect.Lists; +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.ProcessConfiguration; import nu.marginalia.io.processed.DomainLinkRecordParquetFileWriter; import nu.marginalia.io.processed.DomainRecordParquetFileWriter; import nu.marginalia.io.processed.ProcessedDataFileNames; @@ -11,7 +14,6 @@ import nu.marginalia.model.processed.DomainLinkRecord; import nu.marginalia.model.processed.DomainRecord; import nu.marginalia.process.control.ProcessAdHocTaskHeartbeat; import nu.marginalia.process.control.ProcessHeartbeat; -import nu.marginalia.process.control.ProcessHeartbeatImpl; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -43,8 +45,38 @@ class DomainLinksLoaderServiceTest { .withInitScript("db/migration/V23_06_0_000__base.sql") .withNetworkAliases("mariadb"); + HikariDataSource dataSource; + @BeforeEach public void setUp() { + + HikariConfig config = new HikariConfig(); + config.setJdbcUrl(mariaDBContainer.getJdbcUrl()); + config.setUsername("wmsa"); + config.setPassword("wmsa"); + + dataSource = new HikariDataSource(config); + + List migrations = List.of("db/migration/V23_11_0_007__domain_node_affinity.sql"); + for (String migration : migrations) { + try (var resource = Objects.requireNonNull(ClassLoader.getSystemResourceAsStream(migration), + "Could not load migration script " + migration); + var conn = dataSource.getConnection(); + var stmt = conn.createStatement() + ) { + String script = new String(resource.readAllBytes()); + String[] cmds = script.split("\\s*;\\s*"); + for (String cmd : cmds) { + if (cmd.isBlank()) + continue; + System.out.println(cmd); + stmt.executeUpdate(cmd); + } + } catch (IOException | SQLException ex) { + + } + } + heartbeat = Mockito.mock(ProcessHeartbeat.class); Mockito.when(heartbeat.createAdHocTaskHeartbeat(Mockito.anyString())).thenReturn( @@ -59,6 +91,7 @@ class DomainLinksLoaderServiceTest { } toDelete.clear(); + dataSource.close(); } @Test @@ -99,7 +132,7 @@ class DomainLinksLoaderServiceTest { SELECT SOURCE_DOMAIN_ID, DEST_DOMAIN_ID FROM EC_DOMAIN_LINK """) ) { - var domainService = new DomainLoaderService(dataSource); + var domainService = new DomainLoaderService(dataSource, new ProcessConfiguration("test", 1, UUID.randomUUID())); var input = new LoaderInputData(workDir, 2); var domainRegistry = domainService.getOrCreateDomainIds(input); diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java index 1f5af080..5ca890f8 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java @@ -5,7 +5,6 @@ import com.google.inject.Inject; import nu.marginalia.client.ServiceMonitors; import nu.marginalia.control.app.svc.*; import nu.marginalia.control.node.svc.ControlNodeActionsService; -import nu.marginalia.control.node.svc.ControlActorService; import nu.marginalia.control.node.svc.ControlFileStorageService; import nu.marginalia.control.node.svc.ControlNodeService; import nu.marginalia.control.sys.svc.ControlSysActionsService; @@ -44,7 +43,6 @@ public class ControlService extends Service { HeartbeatService heartbeatService, EventLogService eventLogService, RendererFactory rendererFactory, - ControlActorService controlActorService, StaticResources staticResources, MessageQueueService messageQueueService, ControlFileStorageService controlFileStorageService, @@ -71,7 +69,6 @@ public class ControlService extends Service { // node controlFileStorageService.register(); - controlActorService.register(); nodeActionsService.register(); controlNodeService.register(); diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java new file mode 100644 index 00000000..437b9302 --- /dev/null +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java @@ -0,0 +1,9 @@ +package nu.marginalia.control.actor; + +public enum ControlActor { + REBALANCE; + + public String id() { + return "fsm:" + name().toLowerCase(); + } +} diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java new file mode 100644 index 00000000..a4061d62 --- /dev/null +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java @@ -0,0 +1,111 @@ +package nu.marginalia.control.actor; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import lombok.SneakyThrows; +import nu.marginalia.actor.ActorStateMachine; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorStateInstance; +import nu.marginalia.control.actor.rebalance.RebalanceActor; +import nu.marginalia.model.gson.GsonFactory; +import nu.marginalia.mq.MessageQueueFactory; +import nu.marginalia.service.control.ServiceEventLog; +import nu.marginalia.service.server.BaseServiceParams; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + + +@Singleton +public class ControlActorService { + private final ServiceEventLog eventLog; + private final Gson gson; + private final MessageQueueFactory messageQueueFactory; + public Map stateMachines = new HashMap<>(); + public Map actorDefinitions = new HashMap<>(); + private final int node; + @Inject + public ControlActorService(MessageQueueFactory messageQueueFactory, + BaseServiceParams baseServiceParams, + RebalanceActor rebalanceActor + ) { + this.messageQueueFactory = messageQueueFactory; + this.eventLog = baseServiceParams.eventLog; + this.gson = GsonFactory.get(); + this.node = baseServiceParams.configuration.node(); + + register(ControlActor.REBALANCE, rebalanceActor); + } + + private void register(ControlActor process, AbstractActorPrototype graph) { + var sm = new ActorStateMachine(messageQueueFactory, process.id(), node, UUID.randomUUID(), graph); + sm.listen((function, param) -> logStateChange(process, function)); + + stateMachines.put(process, sm); + actorDefinitions.put(process, graph); + } + + private void logStateChange(ControlActor process, String state) { + eventLog.logEvent("FSM-STATE-CHANGE", process.id() + " -> " + state); + } + + public void startFrom(ControlActor process, String state) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).initFrom(state); + } + + public void start(ControlActor process) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).init(); + } + + public void startFrom(ControlActor process, String state, Object arg) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).initFrom(state, gson.toJson(arg)); + } + + public void startFromJSON(ControlActor process, String state, String json) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).initFrom(state, json); + } + + public void start(ControlActor process, Object arg) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).init(gson.toJson(arg)); + } + public void startJSON(ControlActor process, String json) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).init(json); + } + @SneakyThrows + public void stop(ControlActor process) { + eventLog.logEvent("FSM-STOP", process.id()); + + stateMachines.get(process).abortExecution(); + } + + public Map getActorStates() { + return stateMachines.entrySet().stream().collect( + Collectors.toMap( + Map.Entry::getKey, e -> e.getValue().getState()) + ); + } + + public boolean isDirectlyInitializable(ControlActor actor) { + return actorDefinitions.get(actor).isDirectlyInitializable(); + } + + public AbstractActorPrototype getActorDefinition(ControlActor actor) { + return actorDefinitions.get(actor); + } + +} \ No newline at end of file diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/rebalance/RebalanceActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/rebalance/RebalanceActor.java new file mode 100644 index 00000000..97b9a4a5 --- /dev/null +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/rebalance/RebalanceActor.java @@ -0,0 +1,169 @@ +package nu.marginalia.control.actor.rebalance; + +import com.google.inject.Inject; +import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.actor.ActorStateFactory; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorResumeBehavior; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.nodecfg.NodeConfigurationService; +import nu.marginalia.nodecfg.model.NodeConfiguration; +import org.jetbrains.annotations.NotNull; + +import java.sql.SQLException; +import java.util.*; + +public class RebalanceActor extends AbstractActorPrototype { + // States + + public static final String INIT = "INIT"; + public static final String CALCULATE_TRANSACTIONS = "CALCULATE_TRANSACTIONS"; + public static final String END = "END"; + + private final NodeConfigurationService nodeConfigurationService; + private final MqPersistence mqPersistence; + private final HikariDataSource dataSource; + + @Override + public String describe() { + return "Rebalances crawl data among the nodes"; + } + + @Inject + public RebalanceActor(ActorStateFactory stateFactory, + NodeConfigurationService nodeConfigurationService, + MqPersistence mqPersistence, HikariDataSource dataSource) + { + super(stateFactory); + this.nodeConfigurationService = nodeConfigurationService; + this.mqPersistence = mqPersistence; + this.dataSource = dataSource; + } + + @ActorState(name= INIT, next = CALCULATE_TRANSACTIONS, resume = ActorResumeBehavior.ERROR, + description = "Fetches the number of domains assigned to each eligible processing node") + public Map getPopulations() throws Exception { + return getNodePopulations(); + } + + @ActorState(name= CALCULATE_TRANSACTIONS, next = END, resume = ActorResumeBehavior.ERROR, + description = "Calculates how many domains to re-assign between the processing nodes" + ) + public List calculateTransactions(Map populations) { + + if (populations.size() <= 1) { + transition(END); + } + + int average = (int) populations.values().stream().mapToInt(Integer::valueOf).average().orElse(0); + int tolerance = average / 10; + + PriorityQueue surplusList = new PriorityQueue<>(); + PriorityQueue deficitList = new PriorityQueue<>(); + + populations.forEach((node, count) -> { + int delta = count - average; + if (delta - tolerance > 0) { + surplusList.add(new Sur(node, delta)); + } + else if (delta + tolerance < 0) { + deficitList.add(new Def(node, -delta)); + } + }); + + List actions = new ArrayList<>(); + + while (!surplusList.isEmpty() && !deficitList.isEmpty()) { + var sur = surplusList.poll(); + var def = deficitList.poll(); + + assert (sur.n != def.n); + + int amount = Math.min(sur.c, def.c); + actions.add(new Give(sur.n, def.n, amount)); + + if (sur.c - amount > tolerance) { + surplusList.add(new Sur(sur.n, sur.c - amount)); + } + if (def.c - amount > tolerance) { + deficitList.add(new Def(def.n, def.c - amount)); + } + } + + return actions; + } + + private Map getNodePopulations() throws SQLException { + Map ret = new HashMap<>(); + + try (var conn = dataSource.getConnection(); + var query = conn.prepareStatement(""" + SELECT NODE_AFFINITY, COUNT(*) + FROM EC_DOMAIN + GROUP BY NODE_AFFINITY + WHERE NODE_AFFINITY > 0 + """)) { + var rs = query.executeQuery(); + while (rs.next()) { + ret.put(rs.getInt(1), rs.getInt(2)); + } + } + + for (var node : nodeConfigurationService.getAll()) { + if (isNodeExcluded(node)) { + ret.remove(node.node()); + } else { + ret.putIfAbsent(node.node(), 0); + } + } + + return ret; + } + + private boolean isNodeExcluded(NodeConfiguration node) { + return node.disabled(); + } + + //* 1. calculate sizes for each node using db + // + //2. rebalance + // + //-- find average + //-- calculate surplus and deficit, with a NN% tolerance + //-- create instructions for redistribution + // + //3. instruct each executor to transfer data: + // + //-- transfer domain data + //-- append to receiver crawler log + //-- instruct donor to delete file + // + //4. regenerate crawler logs based on present files on all donor nodes */ + + public record Sur(int n, int c) implements Comparable { + @Override + public int compareTo(@NotNull RebalanceActor.Sur o) { + int d = Integer.compare(o.c, c); + if (d != 0) + return d; + + return Integer.compare(n, o.n); + } + } + public record Def(int n, int c) implements Comparable { + + @Override + public int compareTo(@NotNull RebalanceActor.Def o) { + int d = Integer.compare(o.c, c); + if (d != 0) + return d; + + return Integer.compare(n, o.n); + } + } + + public record Give(int donor, int dest, int c) { + + } +} diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlActorService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlActorService.java deleted file mode 100644 index 8d3df93a..00000000 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlActorService.java +++ /dev/null @@ -1,103 +0,0 @@ -package nu.marginalia.control.node.svc; - -import com.google.inject.Inject; -import com.google.inject.Singleton; -import nu.marginalia.client.Context; -import nu.marginalia.control.Redirects; -import nu.marginalia.executor.client.ExecutorClient; -import nu.marginalia.executor.model.ActorRunState; -import spark.Request; -import spark.Response; -import spark.Spark; - -import java.util.List; - -@Singleton -public class ControlActorService { - - private final ExecutorClient executorClient; - - @Inject - public ControlActorService(ExecutorClient executorClient) { - this.executorClient = executorClient; - } - - public void register() { - Spark.post("/public/nodes/:node/storage/:fid/crawl", this::triggerCrawling, Redirects.redirectToActors); - Spark.post("/public/nodes/:node/storage/:fid/process", this::triggerProcessing, Redirects.redirectToActors); - Spark.post("/public/nodes/:node/storage/:fid/process-and-load", this::triggerProcessingWithLoad, Redirects.redirectToActors); - Spark.post("/public/nodes/:node/storage/:fid/load", this::loadProcessedData, Redirects.redirectToActors); - Spark.post("/public/nodes/:node/storage/:fid/restore-backup", this::restoreBackup, Redirects.redirectToActors); - Spark.post("/public/nodes/:node/storage/specs", this::createCrawlSpecification, Redirects.redirectToStorage); - - Spark.post("/public/nodes/:node/fsms/:fsm/start", this::startFsm, Redirects.redirectToActors); - Spark.post("/public/nodes/:node/fsms/:fsm/stop", this::stopFsm, Redirects.redirectToActors); - - } - - public Object startFsm(Request req, Response rsp) throws Exception { - executorClient.startFsm(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fsm").toUpperCase()); - - return ""; - } - - public Object stopFsm(Request req, Response rsp) throws Exception { - executorClient.stopFsm(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fsm").toUpperCase()); - - return ""; - } - - public Object triggerCrawling(Request req, Response response) throws Exception { - executorClient.triggerCrawl(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fid")); - - return ""; - } - - public Object triggerProcessing(Request req, Response response) throws Exception { - executorClient.triggerConvert(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fid")); - - return ""; - } - - public Object triggerProcessingWithLoad(Request req, Response response) throws Exception { - executorClient.triggerProcessAndLoad(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fid")); - - return ""; - } - - public Object loadProcessedData(Request req, Response response) throws Exception { - executorClient.loadProcessedData(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fid")); - - return ""; - } - - public List getActorStates(Request req) { - return executorClient.getActorStates(Context.fromRequest(req), Integer.parseInt(req.params("node"))).states(); - } - - public Object createCrawlSpecification(Request request, Response response) throws Exception { - final String description = request.queryParams("description"); - final String url = request.queryParams("url"); - final String source = request.queryParams("source"); - - if ("db".equals(source)) { - executorClient.createCrawlSpecFromDb(Context.fromRequest(request), 0, description); - } - else if ("download".equals(source)) { - executorClient.createCrawlSpecFromDownload(Context.fromRequest(request), 0, description, url); - } - else { - throw new IllegalArgumentException("Unknown source: " + source); - } - - return ""; - } - - public Object restoreBackup(Request req, Response response) throws Exception { - executorClient.restoreBackup(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fid")); - - return ""; - } - - -} \ No newline at end of file diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlFileStorageService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlFileStorageService.java index 37a44250..475eb0d8 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlFileStorageService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlFileStorageService.java @@ -2,7 +2,9 @@ package nu.marginalia.control.node.svc; import com.google.inject.Inject; import com.google.inject.Singleton; +import nu.marginalia.client.Context; import nu.marginalia.control.Redirects; +import nu.marginalia.executor.client.ExecutorClient; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorageId; import org.slf4j.Logger; @@ -19,16 +21,17 @@ import java.sql.SQLException; @Singleton public class ControlFileStorageService { private final FileStorageService fileStorageService; + private final ExecutorClient executorClient; private final Logger logger = LoggerFactory.getLogger(getClass()); @Inject - public ControlFileStorageService( FileStorageService fileStorageService) + public ControlFileStorageService(FileStorageService fileStorageService, ExecutorClient executorClient) { this.fileStorageService = fileStorageService; + this.executorClient = executorClient; } public void register() throws IOException { - Spark.get("/public/storage/:id/file", this::downloadFileFromStorage); Spark.post("/public/storage/:fid/delete", this::flagFileForDeletionRequest, Redirects.redirectToStorage); } @@ -39,29 +42,5 @@ public class ControlFileStorageService { return ""; } - public Object downloadFileFromStorage(Request request, Response response) throws SQLException { - var fileStorageId = FileStorageId.parse(request.params("id")); - String filename = request.queryParams("name"); - Path root = fileStorageService.getStorage(fileStorageId).asPath(); - Path filePath = root.resolve(filename).normalize(); - - if (!filePath.startsWith(root)) { - response.status(403); - return ""; - } - - if (filePath.endsWith(".txt") || filePath.endsWith(".log")) response.type("text/plain"); - else response.type("application/octet-stream"); - - try (var is = Files.newInputStream(filePath)) { - is.transferTo(response.raw().getOutputStream()); - } - catch (IOException ex) { - logger.error("Failed to download file", ex); - throw new RuntimeException(ex); - } - - return ""; - } } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java index e74369b7..52763292 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java @@ -14,7 +14,6 @@ import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.nodecfg.model.NodeConfiguration; import nu.marginalia.storage.FileStorageService; import nu.marginalia.executor.client.ExecutorClient; -import nu.marginalia.executor.model.crawl.RecrawlParameters; import nu.marginalia.executor.model.load.LoadParameters; import nu.marginalia.renderer.RendererFactory; import nu.marginalia.service.id.ServiceId; @@ -25,9 +24,7 @@ import spark.Request; import spark.Response; import spark.Spark; -import java.io.FileNotFoundException; import java.io.IOException; -import java.nio.file.Files; import java.nio.file.Path; import java.sql.SQLException; import java.util.*; @@ -102,6 +99,7 @@ public class ControlNodeService { Spark.post("/public/nodes/:id/storage/:fid/delete", this::deleteFileStorage); Spark.post("/public/nodes/:id/storage/:fid/enable", this::enableFileStorage); Spark.post("/public/nodes/:id/storage/:fid/disable", this::disableFileStorage); + Spark.get("/public/nodes/:id/storage/:fid/transfer", this::downloadFileFromStorage); } @@ -145,10 +143,7 @@ public class ControlNodeService { final String source = request.queryParams("source"); int nodeId = Integer.parseInt(request.params("id")); - if ("db".equals(source)) { - executorClient.createCrawlSpecFromDb(Context.fromRequest(request), nodeId, description); - } - else if ("download".equals(source)) { + if ("download".equals(source)) { executorClient.createCrawlSpecFromDownload(Context.fromRequest(request), nodeId, description, url); } else { @@ -174,11 +169,9 @@ public class ControlNodeService { if (toCrawl.size() != 1) throw new IllegalStateException(); - var specs = fileStorageService.getActiveFileStorages(nodeId, FileStorageType.CRAWL_SPEC); - executorClient.triggerRecrawl(Context.fromRequest(request), nodeId, - new RecrawlParameters(toCrawl.get(0), specs)); + toCrawl.get(0)); return redirectToOverview(request); } @@ -360,6 +353,24 @@ public class ControlNodeService { ); } + public Object downloadFileFromStorage(Request request, Response response) throws IOException { + int nodeId = Integer.parseInt(request.params("id")); + var fileStorageId = FileStorageId.parse(request.params("fid")); + + String path = request.queryParams("path"); + + response.header("content-disposition", "attachment; filename=\""+path+"\""); + + if (path.endsWith(".txt") || path.endsWith(".log")) + response.type("text/plain"); + else + response.type("application/octet-stream"); + + executorClient.transferFile(Context.fromRequest(request), nodeId, fileStorageId, path, response.raw().getOutputStream()); + + return ""; + } + private Object getStorageBaseList(int nodeId) throws SQLException { List bases = new ArrayList<>(); diff --git a/code/services-core/control-service/src/main/resources/templates/control/partials/storage-details/files.hdb b/code/services-core/control-service/src/main/resources/templates/control/partials/storage-details/files.hdb index 403657db..94d38110 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/partials/storage-details/files.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/partials/storage-details/files.hdb @@ -8,7 +8,9 @@ {{#each files}} - {{filename}} + + {{filename}} + {{mTime}} {{size}} diff --git a/code/services-core/control-service/src/test/java/nu/marginalia/control/actor/rebalance/RebalanceActorTest.java b/code/services-core/control-service/src/test/java/nu/marginalia/control/actor/rebalance/RebalanceActorTest.java new file mode 100644 index 00000000..3cdda156 --- /dev/null +++ b/code/services-core/control-service/src/test/java/nu/marginalia/control/actor/rebalance/RebalanceActorTest.java @@ -0,0 +1,64 @@ +package nu.marginalia.control.actor.rebalance; + +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +class RebalanceActorTest { + RebalanceActor actor = new RebalanceActor(null, null, null, null); + + @Test + void calculateTransactions1_2() { + var transactions = actor.calculateTransactions(Map.of(1, 100, 2, 0)); + var expected = List.of(new RebalanceActor.Give(1, 2, 50)); + + Assertions.assertEquals(expected, transactions); + } + + @Test + void calculateTransactions1_3() { + var transactions = actor.calculateTransactions(Map.of(1, 90, 2, 0, 3, 0)); + var expected = List.of( + new RebalanceActor.Give(1, 2, 30), + new RebalanceActor.Give(1, 3, 30) + ); + + Assertions.assertEquals(expected, transactions); + } + + @Test + void calculateTransactions2_3() { + var transactions = actor.calculateTransactions(Map.of(1, 30, 2, 30, 3, 0)); + var expected = List.of( + new RebalanceActor.Give(1, 3, 10), + new RebalanceActor.Give(2, 3, 10) + ); + + Assertions.assertEquals(expected, transactions); + } + + @Test + void calculateTransactionsEmpty() { + try { + actor.calculateTransactions(Map.of()); + Assertions.fail("Expected transition"); + } + catch (AbstractActorPrototype.ControlFlowException ex) { + Assertions.assertEquals("END", ex.getState()); + } + + try { + actor.calculateTransactions(Map.of(1, 100)); + Assertions.fail("Expected transition"); + } + catch (AbstractActorPrototype.ControlFlowException ex) { + Assertions.assertEquals("END", ex.getState()); + } + } +} \ No newline at end of file diff --git a/code/services-core/executor-service/build.gradle b/code/services-core/executor-service/build.gradle index d8cb39ad..d95b5070 100644 --- a/code/services-core/executor-service/build.gradle +++ b/code/services-core/executor-service/build.gradle @@ -24,6 +24,7 @@ java { dependencies { implementation project(':code:common:config') implementation project(':code:common:model') + implementation project(':code:common:process') implementation project(':code:common:db') implementation project(':code:common:service') diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorApi.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorApi.java index fb73903b..b6dc4e18 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorApi.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorApi.java @@ -10,15 +10,15 @@ import spark.Spark; @Singleton public class ActorApi { - private final ActorControlService actors; + private final ExecutorActorControlService actors; private final Logger logger = LoggerFactory.getLogger(getClass()); @Inject - public ActorApi(ActorControlService actors) { + public ActorApi(ExecutorActorControlService actors) { this.actors = actors; } public Object startActorFromState(Request request, Response response) throws Exception { - Actor actor = translateActor(request.params("id")); + ExecutorActor actor = translateActor(request.params("id")); String state = request.params("state"); actors.startFromJSON(actor, state, request.body()); @@ -27,7 +27,7 @@ public class ActorApi { } public Object startActor(Request request, Response response) throws Exception { - Actor actor = translateActor(request.params("id")); + ExecutorActor actor = translateActor(request.params("id")); actors.startJSON(actor, request.body()); @@ -35,16 +35,16 @@ public class ActorApi { } public Object stopActor(Request request, Response response) { - Actor actor = translateActor(request.params("id")); + ExecutorActor actor = translateActor(request.params("id")); actors.stop(actor); return "OK"; } - public Actor translateActor(String name) { + public ExecutorActor translateActor(String name) { try { - return Actor.valueOf(name.toUpperCase()); + return ExecutorActor.valueOf(name.toUpperCase()); } catch (IllegalArgumentException ex) { logger.error("Unknown actor {}", name); diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorControlService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorControlService.java deleted file mode 100644 index ef977c74..00000000 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorControlService.java +++ /dev/null @@ -1,145 +0,0 @@ -package nu.marginalia.actor; - -import com.google.gson.Gson; -import com.google.inject.Inject; -import com.google.inject.Singleton; -import lombok.SneakyThrows; -import nu.marginalia.actor.monitor.*; -import nu.marginalia.actor.proc.*; -import nu.marginalia.actor.prototype.AbstractActorPrototype; -import nu.marginalia.actor.state.ActorStateInstance; -import nu.marginalia.actor.task.*; -import nu.marginalia.model.gson.GsonFactory; -import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.service.control.ServiceEventLog; -import nu.marginalia.service.server.BaseServiceParams; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.stream.Collectors; - -/** This class is responsible for starting and stopping the various actors in the responsible service */ -@Singleton -public class ActorControlService { - private final ServiceEventLog eventLog; - private final Gson gson; - private final MessageQueueFactory messageQueueFactory; - public Map stateMachines = new HashMap<>(); - public Map actorDefinitions = new HashMap<>(); - private final int node; - @Inject - public ActorControlService(MessageQueueFactory messageQueueFactory, - BaseServiceParams baseServiceParams, - ConvertActor convertActor, - ConvertAndLoadActor convertAndLoadActor, - CrawlActor crawlActor, - RecrawlActor recrawlActor, - RestoreBackupActor restoreBackupActor, - ConverterMonitorActor converterMonitorFSM, - CrawlerMonitorActor crawlerMonitorActor, - LoaderMonitorActor loaderMonitor, - MessageQueueMonitorActor messageQueueMonitor, - ProcessLivenessMonitorActor processMonitorFSM, - FileStorageMonitorActor fileStorageMonitorActor, - IndexConstructorMonitorActor indexConstructorMonitorActor, - TriggerAdjacencyCalculationActor triggerAdjacencyCalculationActor, - CrawlJobExtractorActor crawlJobExtractorActor, - ExportDataActor exportDataActor, - TruncateLinkDatabase truncateLinkDatabase - ) { - this.messageQueueFactory = messageQueueFactory; - this.eventLog = baseServiceParams.eventLog; - this.gson = GsonFactory.get(); - this.node = baseServiceParams.configuration.node(); - - register(Actor.CRAWL, crawlActor); - register(Actor.RECRAWL, recrawlActor); - register(Actor.CONVERT, convertActor); - register(Actor.RESTORE_BACKUP, restoreBackupActor); - register(Actor.CONVERT_AND_LOAD, convertAndLoadActor); - - register(Actor.PROC_INDEX_CONSTRUCTOR_SPAWNER, indexConstructorMonitorActor); - register(Actor.PROC_CONVERTER_SPAWNER, converterMonitorFSM); - register(Actor.PROC_LOADER_SPAWNER, loaderMonitor); - register(Actor.PROC_CRAWLER_SPAWNER, crawlerMonitorActor); - - register(Actor.MONITOR_MESSAGE_QUEUE, messageQueueMonitor); - register(Actor.MONITOR_PROCESS_LIVENESS, processMonitorFSM); - register(Actor.MONITOR_FILE_STORAGE, fileStorageMonitorActor); - - register(Actor.ADJACENCY_CALCULATION, triggerAdjacencyCalculationActor); - register(Actor.CRAWL_JOB_EXTRACTOR, crawlJobExtractorActor); - register(Actor.EXPORT_DATA, exportDataActor); - register(Actor.TRUNCATE_LINK_DATABASE, truncateLinkDatabase); - } - - private void register(Actor process, AbstractActorPrototype graph) { - var sm = new ActorStateMachine(messageQueueFactory, process.id(), node, UUID.randomUUID(), graph); - sm.listen((function, param) -> logStateChange(process, function)); - - stateMachines.put(process, sm); - actorDefinitions.put(process, graph); - } - - private void logStateChange(Actor process, String state) { - eventLog.logEvent("FSM-STATE-CHANGE", process.id() + " -> " + state); - } - - public void startFrom(Actor process, String state) throws Exception { - eventLog.logEvent("FSM-START", process.id()); - - stateMachines.get(process).initFrom(state); - } - - public void start(Actor process) throws Exception { - eventLog.logEvent("FSM-START", process.id()); - - stateMachines.get(process).init(); - } - - public void startFrom(Actor process, String state, Object arg) throws Exception { - eventLog.logEvent("FSM-START", process.id()); - - stateMachines.get(process).initFrom(state, gson.toJson(arg)); - } - - public void startFromJSON(Actor process, String state, String json) throws Exception { - eventLog.logEvent("FSM-START", process.id()); - - stateMachines.get(process).initFrom(state, json); - } - - public void start(Actor process, Object arg) throws Exception { - eventLog.logEvent("FSM-START", process.id()); - - stateMachines.get(process).init(gson.toJson(arg)); - } - public void startJSON(Actor process, String json) throws Exception { - eventLog.logEvent("FSM-START", process.id()); - - stateMachines.get(process).init(json); - } - @SneakyThrows - public void stop(Actor process) { - eventLog.logEvent("FSM-STOP", process.id()); - - stateMachines.get(process).abortExecution(); - } - - public Map getActorStates() { - return stateMachines.entrySet().stream().collect( - Collectors.toMap( - Map.Entry::getKey, e -> e.getValue().getState()) - ); - } - - public boolean isDirectlyInitializable(Actor actor) { - return actorDefinitions.get(actor).isDirectlyInitializable(); - } - - public AbstractActorPrototype getActorDefinition(Actor actor) { - return actorDefinitions.get(actor); - } - -} diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/Actor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ExecutorActor.java similarity index 90% rename from code/services-core/executor-service/src/main/java/nu/marginalia/actor/Actor.java rename to code/services-core/executor-service/src/main/java/nu/marginalia/actor/ExecutorActor.java index 10027f56..8aefba75 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/Actor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ExecutorActor.java @@ -1,6 +1,6 @@ package nu.marginalia.actor; -public enum Actor { +public enum ExecutorActor { CRAWL, RECRAWL, CONVERT_AND_LOAD, @@ -16,6 +16,7 @@ public enum Actor { TRUNCATE_LINK_DATABASE, PROC_INDEX_CONSTRUCTOR_SPAWNER, CONVERT, + TRANSFER_DOMAINS, RESTORE_BACKUP; public String id() { diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ExecutorActorControlService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ExecutorActorControlService.java new file mode 100644 index 00000000..274e8dd1 --- /dev/null +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ExecutorActorControlService.java @@ -0,0 +1,147 @@ +package nu.marginalia.actor; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import lombok.SneakyThrows; +import nu.marginalia.actor.monitor.*; +import nu.marginalia.actor.proc.*; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorStateInstance; +import nu.marginalia.actor.task.*; +import nu.marginalia.model.gson.GsonFactory; +import nu.marginalia.mq.MessageQueueFactory; +import nu.marginalia.service.control.ServiceEventLog; +import nu.marginalia.service.server.BaseServiceParams; + +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +/** This class is responsible for starting and stopping the various actors in the responsible service */ +@Singleton +public class ExecutorActorControlService { + private final ServiceEventLog eventLog; + private final Gson gson; + private final MessageQueueFactory messageQueueFactory; + public Map stateMachines = new HashMap<>(); + public Map actorDefinitions = new HashMap<>(); + private final int node; + @Inject + public ExecutorActorControlService(MessageQueueFactory messageQueueFactory, + BaseServiceParams baseServiceParams, + ConvertActor convertActor, + ConvertAndLoadActor convertAndLoadActor, + CrawlActor crawlActor, + RecrawlActor recrawlActor, + RestoreBackupActor restoreBackupActor, + ConverterMonitorActor converterMonitorFSM, + CrawlerMonitorActor crawlerMonitorActor, + LoaderMonitorActor loaderMonitor, + MessageQueueMonitorActor messageQueueMonitor, + ProcessLivenessMonitorActor processMonitorFSM, + FileStorageMonitorActor fileStorageMonitorActor, + IndexConstructorMonitorActor indexConstructorMonitorActor, + TriggerAdjacencyCalculationActor triggerAdjacencyCalculationActor, + CrawlJobExtractorActor crawlJobExtractorActor, + ExportDataActor exportDataActor, + TruncateLinkDatabase truncateLinkDatabase, + TransferDomainsActor transferDomainsActor + ) { + this.messageQueueFactory = messageQueueFactory; + this.eventLog = baseServiceParams.eventLog; + this.gson = GsonFactory.get(); + this.node = baseServiceParams.configuration.node(); + + register(ExecutorActor.CRAWL, crawlActor); + register(ExecutorActor.RECRAWL, recrawlActor); + register(ExecutorActor.CONVERT, convertActor); + register(ExecutorActor.RESTORE_BACKUP, restoreBackupActor); + register(ExecutorActor.CONVERT_AND_LOAD, convertAndLoadActor); + + register(ExecutorActor.PROC_INDEX_CONSTRUCTOR_SPAWNER, indexConstructorMonitorActor); + register(ExecutorActor.PROC_CONVERTER_SPAWNER, converterMonitorFSM); + register(ExecutorActor.PROC_LOADER_SPAWNER, loaderMonitor); + register(ExecutorActor.PROC_CRAWLER_SPAWNER, crawlerMonitorActor); + + register(ExecutorActor.MONITOR_MESSAGE_QUEUE, messageQueueMonitor); + register(ExecutorActor.MONITOR_PROCESS_LIVENESS, processMonitorFSM); + register(ExecutorActor.MONITOR_FILE_STORAGE, fileStorageMonitorActor); + + register(ExecutorActor.ADJACENCY_CALCULATION, triggerAdjacencyCalculationActor); + register(ExecutorActor.CRAWL_JOB_EXTRACTOR, crawlJobExtractorActor); + register(ExecutorActor.EXPORT_DATA, exportDataActor); + register(ExecutorActor.TRUNCATE_LINK_DATABASE, truncateLinkDatabase); + register(ExecutorActor.TRANSFER_DOMAINS, transferDomainsActor); + } + + private void register(ExecutorActor process, AbstractActorPrototype graph) { + var sm = new ActorStateMachine(messageQueueFactory, process.id(), node, UUID.randomUUID(), graph); + sm.listen((function, param) -> logStateChange(process, function)); + + stateMachines.put(process, sm); + actorDefinitions.put(process, graph); + } + + private void logStateChange(ExecutorActor process, String state) { + eventLog.logEvent("FSM-STATE-CHANGE", process.id() + " -> " + state); + } + + public void startFrom(ExecutorActor process, String state) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).initFrom(state); + } + + public void start(ExecutorActor process) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).init(); + } + + public void startFrom(ExecutorActor process, String state, Object arg) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).initFrom(state, gson.toJson(arg)); + } + + public void startFromJSON(ExecutorActor process, String state, String json) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).initFrom(state, json); + } + + public void start(ExecutorActor process, Object arg) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).init(gson.toJson(arg)); + } + public void startJSON(ExecutorActor process, String json) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).init(json); + } + @SneakyThrows + public void stop(ExecutorActor process) { + eventLog.logEvent("FSM-STOP", process.id()); + + stateMachines.get(process).abortExecution(); + } + + public Map getActorStates() { + return stateMachines.entrySet().stream().collect( + Collectors.toMap( + Map.Entry::getKey, e -> e.getValue().getState()) + ); + } + + public boolean isDirectlyInitializable(ExecutorActor actor) { + return actorDefinitions.get(actor).isDirectlyInitializable(); + } + + public AbstractActorPrototype getActorDefinition(ExecutorActor actor) { + return actorDefinitions.get(actor); + } + +} diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/CrawlJobExtractorActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/CrawlJobExtractorActor.java index cc7a5658..f0e8b5c8 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/CrawlJobExtractorActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/CrawlJobExtractorActor.java @@ -9,6 +9,7 @@ import nu.marginalia.actor.state.ActorResumeBehavior; import nu.marginalia.actor.state.ActorState; import nu.marginalia.crawlspec.CrawlSpecFileNames; import nu.marginalia.db.DbDomainStatsExportMultitool; +import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorageBaseType; import nu.marginalia.storage.model.FileStorageType; @@ -28,20 +29,15 @@ public class CrawlJobExtractorActor extends AbstractActorPrototype { private final Logger logger = LoggerFactory.getLogger(getClass()); // STATES - public static final String CREATE_FROM_DB = "CREATE_FROM_DB"; public static final String CREATE_FROM_LINK = "CREATE_FROM_LINK"; public static final String END = "END"; private final FileStorageService fileStorageService; - private final HikariDataSource dataSource; - @Inject public CrawlJobExtractorActor(ActorStateFactory stateFactory, - FileStorageService fileStorageService, - HikariDataSource dataSource + FileStorageService fileStorageService ) { super(stateFactory); this.fileStorageService = fileStorageService; - this.dataSource = dataSource; } public record CrawlJobExtractorArguments(String description) { } @@ -90,35 +86,4 @@ public class CrawlJobExtractorActor extends AbstractActorPrototype { ); } - - @ActorState(name = CREATE_FROM_DB, next = END, - resume = ActorResumeBehavior.ERROR, - description = """ - Spawns a CrawlJobExtractor process that loads data from the link database, and wait for it to finish. - """ - ) - public void createFromDB(CrawlJobExtractorArguments arg) throws Exception { - if (arg == null) { - error("This actor requires a CrawlJobExtractorArguments argument"); - } - - var base = fileStorageService.getStorageBase(FileStorageBaseType.STORAGE); - var storage = fileStorageService.allocateTemporaryStorage(base, FileStorageType.CRAWL_SPEC, "crawl-spec", arg.description()); - - final Path path = CrawlSpecFileNames.resolve(storage); - - try (var dbTools = new DbDomainStatsExportMultitool(dataSource)) { - generateCrawlSpec( - path, - DomainSource.combined( - DomainSource.knownUrlsFromDb(dbTools), - DomainSource.fromCrawlQueue(dbTools) - ), - KnownUrlsCountSource.fromDb(dbTools, 200), - KnownUrlsListSource.justIndex() // TODO: hook in linkdb maybe? - ); - } - - } - } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java index 204253fe..72140509 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java @@ -13,17 +13,14 @@ import nu.marginalia.actor.state.ActorState; import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessService; import nu.marginalia.storage.FileStorageService; -import nu.marginalia.storage.model.FileStorage; import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.storage.model.FileStorageType; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mqapi.crawling.CrawlRequest; +import org.jetbrains.annotations.NotNull; import java.nio.file.Files; -import java.sql.SQLException; -import java.util.List; -import java.util.Optional; @Singleton public class RecrawlActor extends AbstractActorPrototype { @@ -42,8 +39,12 @@ public class RecrawlActor extends AbstractActorPrototype { @AllArgsConstructor @With @NoArgsConstructor public static class RecrawlMessage { - public List crawlSpecId = null; - public FileStorageId crawlStorageId = null; + /** The storage where the crawl data will be stored. If this contains existing crawl + * data, it will be consulted for e.g. e-tag comparisons. + */ + @NotNull + public FileStorageId crawlStorageId; + public long crawlerMsgId = 0L; }; @@ -52,8 +53,8 @@ public class RecrawlActor extends AbstractActorPrototype { return "Run the crawler with the given crawl spec using previous crawl data for a reference"; } - public static RecrawlMessage recrawlFromCrawlDataAndCralSpec(FileStorageId crawlData, List crawlSpec) { - return new RecrawlMessage(crawlSpec, crawlData, 0L); + public static RecrawlMessage recrawlFromCrawlDataAndCrawlSpec(FileStorageId crawlData) { + return new RecrawlMessage(crawlData, 0L); } @Inject @@ -83,28 +84,12 @@ public class RecrawlActor extends AbstractActorPrototype { var crawlStorage = storageService.getStorage(recrawlMessage.crawlStorageId); - for (var specs : recrawlMessage.crawlSpecId) { - FileStorage specStorage = storageService.getStorage(specs); - - if (specStorage == null) error("Bad storage id"); - if (specStorage.type() != FileStorageType.CRAWL_SPEC) error("Bad storage type " + specStorage.type()); - } - - if (crawlStorage == null) error("Bad storage id"); if (crawlStorage.type() != FileStorageType.CRAWL_DATA) error("Bad storage type " + crawlStorage.type()); Files.deleteIfExists(crawlStorage.asPath().resolve("crawler.log")); - return recrawlMessage - .withCrawlSpecId(recrawlMessage.crawlSpecId); - } - - private Optional getSpec(FileStorage crawlStorage) throws SQLException { - return storageService.getSourceFromStorage(crawlStorage) - .stream() - .filter(storage -> storage.type().equals(FileStorageType.CRAWL_SPEC)) - .findFirst(); + return recrawlMessage; } @ActorState(name = CRAWL, @@ -117,7 +102,7 @@ public class RecrawlActor extends AbstractActorPrototype { public RecrawlMessage crawl(RecrawlMessage recrawlMessage) throws Exception { // Pre-send crawl request - var request = new CrawlRequest(recrawlMessage.crawlSpecId, recrawlMessage.crawlStorageId); + var request = new CrawlRequest(null, recrawlMessage.crawlStorageId); long id = mqCrawlerOutbox.sendAsync(CrawlRequest.class.getSimpleName(), gson.toJson(request)); return recrawlMessage.withCrawlerMsgId(id); diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RestoreBackupActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RestoreBackupActor.java index 87dc0f78..43568b9e 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RestoreBackupActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RestoreBackupActor.java @@ -5,7 +5,7 @@ import nu.marginalia.actor.ActorStateFactory; import nu.marginalia.actor.prototype.AbstractActorPrototype; import nu.marginalia.actor.state.ActorResumeBehavior; import nu.marginalia.actor.state.ActorState; -import nu.marginalia.actor.Actor; +import nu.marginalia.actor.ExecutorActor; import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.svc.BackupService; import nu.marginalia.storage.model.FileStorageId; @@ -43,7 +43,7 @@ public class RestoreBackupActor extends AbstractActorPrototype { backupService.restoreBackup(id); mqPersistence.sendNewMessage( - Actor.CONVERT_AND_LOAD.id() + ":" + node, + ExecutorActor.CONVERT_AND_LOAD.id() + ":" + node, null, null, ConvertAndLoadActor.REPARTITION, diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/TransferDomainsActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/TransferDomainsActor.java new file mode 100644 index 00000000..d38019b9 --- /dev/null +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/TransferDomainsActor.java @@ -0,0 +1,182 @@ +package nu.marginalia.actor.task; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.zaxxer.hikari.HikariDataSource; +import lombok.AllArgsConstructor; +import lombok.NoArgsConstructor; +import lombok.With; +import nu.marginalia.actor.ActorStateFactory; +import nu.marginalia.actor.prototype.AbstractActorPrototype; +import nu.marginalia.actor.state.ActorResumeBehavior; +import nu.marginalia.actor.state.ActorState; +import nu.marginalia.client.Context; +import nu.marginalia.executor.client.ExecutorClient; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.mqapi.ProcessInboxNames; +import nu.marginalia.process.ProcessOutboxes; +import nu.marginalia.process.log.WorkLog; +import nu.marginalia.service.module.ServiceConfiguration; +import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorageBaseType; +import nu.marginalia.storage.model.FileStorageId; +import nu.marginalia.storage.model.FileStorageType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.nio.file.StandardOpenOption; +import java.nio.file.attribute.PosixFilePermissions; +import java.util.UUID; +import java.util.zip.GZIPOutputStream; + +@Singleton +public class TransferDomainsActor extends AbstractActorPrototype { + + + // STATES + public static final String INITIAL = "INITIAL"; + public static final String TRANSFER_DOMAINS = "TRANSFER-DOMAINS"; + public static final String UPDATE_DONOR_LOG = "UPDATE_DONOR_LOG"; + + public static final String END = "END"; + private final FileStorageService storageService; + private final ExecutorClient executorClient; + private final MqPersistence persistence; + private final HikariDataSource dataSource; + private final Logger logger = LoggerFactory.getLogger(getClass()); + + private final int nodeId; + private final String executorServiceName; + + @AllArgsConstructor @With @NoArgsConstructor + public static class Message { + int sourceNode; + int count; + }; + + @Override + public String describe() { + return "Transfers domains between nodes' crawl data sets"; + } + + @Inject + public TransferDomainsActor(ActorStateFactory stateFactory, + ServiceConfiguration configuration, + FileStorageService storageService, + ExecutorClient executorClient, + MqPersistence persistence, + HikariDataSource dataSource) + { + super(stateFactory); + this.storageService = storageService; + this.executorClient = executorClient; + this.persistence = persistence; + this.dataSource = dataSource; + this.nodeId = configuration.node(); + this.executorServiceName = configuration.serviceName(); + } + + @ActorState(name = INITIAL, + next = TRANSFER_DOMAINS, + description = """ + Ensure preconditions are met + """) + public Message init(Message message) throws Exception { + var storages = storageService.getActiveFileStorages(FileStorageType.CRAWL_DATA); + + // Ensure crawl data exists to receive into + if (storages.isEmpty()) { + var storage = storageService.allocateTemporaryStorage( + storageService.getStorageBase(FileStorageBaseType.STORAGE), + FileStorageType.CRAWL_DATA, + "crawl-data", + "Crawl Data" + ); + storageService.enableFileStorage(storage.id()); + + } + + return message; + } + + @ActorState(name = TRANSFER_DOMAINS, + next = UPDATE_DONOR_LOG, + resume = ActorResumeBehavior.ERROR, + description = """ + Do the needful + """ + ) + public Message transferData(Message message) throws Exception { + var storageId = storageService.getActiveFileStorages(FileStorageType.CRAWL_DATA).get(0); + var storage = storageService.getStorage(storageId); + + var spec = executorClient.getTransferSpec(Context.internal(), message.sourceNode, message.count); + if (spec.size() == 0) { + transition("END", "NOTHING TO TRANSFER"); + } + + Path basePath = storage.asPath(); + try (var workLog = new WorkLog(basePath.resolve("crawler.log")); + var conn = dataSource.getConnection(); + var stmt = conn.prepareStatement("UPDATE EC_DOMAIN SET NODE_AFFINITY=? WHERE ID=?"); + ) { + for (var item : spec.items()) { + logger.info("{}", item); + logger.info("Transferring {}", item.domainName()); + + Path dest = basePath.resolve(item.path()); + Files.createDirectories(dest.getParent()); + try (var fileStream = Files.newOutputStream(dest)) { + executorClient.transferFile(Context.internal(), + message.sourceNode, + item.fileStorageId(), + item.path(), + fileStream); + + stmt.setInt(1, nodeId); + stmt.setInt(2, item.domainId()); + stmt.executeUpdate(); + + executorClient.yieldDomain(Context.internal(), message.sourceNode, item); + workLog.setJobToFinished(item.domainName(), item.path(), 1); + } + catch (IOException ex) { + Files.deleteIfExists(dest); + error(ex); + } + catch (Exception ex) { + error(ex); + } + } + } + + return message; + } + + @ActorState(name = UPDATE_DONOR_LOG, + next = END, + resume = ActorResumeBehavior.ERROR, + description = """ + Do the needful + """ + ) + public void updateDonorLog(Message message) throws InterruptedException { + var outbox = new MqOutbox(persistence, executorServiceName, message.sourceNode, + getClass().getSimpleName(), nodeId, UUID.randomUUID()); + + try { + outbox.send("PRUNE-CRAWL-DATA", ":-)"); + } catch (Exception e) { + throw new RuntimeException(e); + } finally { + outbox.stop(); + } + } +} diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java index 1c539b5e..ecfe901d 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java @@ -2,24 +2,22 @@ package nu.marginalia.executor; import com.google.gson.Gson; import com.google.inject.Inject; -import lombok.SneakyThrows; -import nu.marginalia.actor.Actor; +import nu.marginalia.actor.ExecutorActor; import nu.marginalia.actor.ActorApi; -import nu.marginalia.actor.ActorControlService; +import nu.marginalia.actor.ExecutorActorControlService; import nu.marginalia.actor.state.ActorState; import nu.marginalia.actor.state.ActorStateInstance; import nu.marginalia.executor.model.ActorRunState; import nu.marginalia.executor.model.ActorRunStates; -import nu.marginalia.executor.storage.FileStorageContent; -import nu.marginalia.executor.storage.FileStorageFile; import nu.marginalia.executor.svc.BackupService; import nu.marginalia.executor.svc.ProcessingService; import nu.marginalia.executor.svc.SideloadService; +import nu.marginalia.executor.svc.TransferService; import nu.marginalia.service.server.BaseServiceParams; import nu.marginalia.service.server.Service; import nu.marginalia.service.server.mq.MqNotification; +import nu.marginalia.service.server.mq.MqRequest; import nu.marginalia.storage.FileStorageService; -import nu.marginalia.storage.model.FileStorageId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import spark.Request; @@ -27,37 +25,35 @@ import spark.Response; import spark.Spark; import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; import java.sql.SQLException; -import java.time.format.DateTimeFormatter; -import java.util.ArrayList; import java.util.Comparator; -import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; // Weird name for this one to not have clashes with java.util.concurrent.ExecutorService public class ExecutorSvc extends Service { private final BaseServiceParams params; - private final ActorControlService actorControlService; + private final ExecutorActorControlService actorControlService; private final FileStorageService fileStorageService; + private final TransferService transferService; private static final Logger logger = LoggerFactory.getLogger(ExecutorSvc.class); @Inject public ExecutorSvc(BaseServiceParams params, - ActorControlService actorControlService, + ExecutorActorControlService actorControlService, ProcessingService processingService, SideloadService sideloadService, BackupService backupService, FileStorageService fileStorageService, Gson gson, + TransferService transferService, ActorApi actorApi) { super(params); this.params = params; this.actorControlService = actorControlService; this.fileStorageService = fileStorageService; + this.transferService = transferService; Spark.post("/actor/:id/start", actorApi::startActor); Spark.post("/actor/:id/start/:state", actorApi::startActorFromState); @@ -68,7 +64,6 @@ public class ExecutorSvc extends Service { Spark.post("/process/recrawl", processingService::startRecrawl); Spark.post("/process/convert/:fid", processingService::startConversion); Spark.post("/process/convert-load/:fid", processingService::startConvertLoad); - Spark.post("/process/crawl-spec/from-db", processingService::createCrawlSpecFromDb); Spark.post("/process/crawl-spec/from-download", processingService::createCrawlSpecFromDownload); Spark.post("/process/load", processingService::startLoad); Spark.post("/process/adjacency-calculation", processingService::startAdjacencyCalculation); @@ -78,47 +73,30 @@ public class ExecutorSvc extends Service { Spark.post("/sideload/encyclopedia", sideloadService::sideloadEncyclopedia); Spark.post("/backup/:fid/restore", backupService::restore); - Spark.get("/storage/:fid", this::listFiles, gson::toJson); + Spark.get("/storage/:fid", transferService::listFiles, gson::toJson); + Spark.get("/transfer/file/:fid", transferService::transferFile); + + Spark.get("/transfer/spec", transferService::getTransferSpec, gson::toJson); + Spark.post("/transfer/yield", transferService::yieldDomain); } @MqNotification(endpoint="FIRST-BOOT") public void setUpDefaultActors(String message) throws Exception { logger.info("Initializing default actors"); - actorControlService.start(Actor.MONITOR_PROCESS_LIVENESS); - actorControlService.start(Actor.MONITOR_FILE_STORAGE); - actorControlService.start(Actor.MONITOR_MESSAGE_QUEUE); - actorControlService.start(Actor.PROC_CONVERTER_SPAWNER); - actorControlService.start(Actor.PROC_CRAWLER_SPAWNER); - actorControlService.start(Actor.PROC_INDEX_CONSTRUCTOR_SPAWNER); - actorControlService.start(Actor.PROC_LOADER_SPAWNER); + actorControlService.start(ExecutorActor.MONITOR_PROCESS_LIVENESS); + actorControlService.start(ExecutorActor.MONITOR_FILE_STORAGE); + actorControlService.start(ExecutorActor.MONITOR_MESSAGE_QUEUE); + actorControlService.start(ExecutorActor.PROC_CONVERTER_SPAWNER); + actorControlService.start(ExecutorActor.PROC_CRAWLER_SPAWNER); + actorControlService.start(ExecutorActor.PROC_INDEX_CONSTRUCTOR_SPAWNER); + actorControlService.start(ExecutorActor.PROC_LOADER_SPAWNER); } + @MqRequest(endpoint="PRUNE-CRAWL-DATA") + public String pruneCrawlData(String message) throws SQLException, IOException { + transferService.pruneCrawlDataMqEndpoint(); - - private FileStorageContent listFiles(Request request, Response response) throws SQLException, IOException { - FileStorageId fileStorageId = FileStorageId.parse(request.params("fid")); - - var storage = fileStorageService.getStorage(fileStorageId); - - List files; - - try (var fs = Files.list(storage.asPath())) { - files = fs.filter(Files::isRegularFile) - .map(this::createFileModel) - .sorted(Comparator.comparing(FileStorageFile::name)) - .toList(); - } - - return new FileStorageContent(files); - } - - @SneakyThrows - private FileStorageFile createFileModel(Path path) { - return new FileStorageFile( - path.toFile().getName(), - Files.size(path), - Files.getLastModifiedTime(path).toInstant().toString() - ); + return "OK"; } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/BackupService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/BackupService.java index 4b6116d9..601f70ff 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/BackupService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/BackupService.java @@ -1,24 +1,24 @@ package nu.marginalia.executor.svc; import com.google.inject.Inject; -import nu.marginalia.actor.Actor; -import nu.marginalia.actor.ActorControlService; +import nu.marginalia.actor.ExecutorActor; +import nu.marginalia.actor.ExecutorActorControlService; import nu.marginalia.actor.task.RestoreBackupActor; import nu.marginalia.storage.model.FileStorageId; import spark.Request; import spark.Response; public class BackupService { - private final ActorControlService actorControlService; + private final ExecutorActorControlService actorControlService; @Inject - public BackupService(ActorControlService actorControlService) { + public BackupService(ExecutorActorControlService actorControlService) { this.actorControlService = actorControlService; } public Object restore(Request request, Response response) throws Exception { var fid = FileStorageId.parse(request.params("fid")); - actorControlService.startFrom(Actor.RESTORE_BACKUP, RestoreBackupActor.RESTORE, fid); + actorControlService.startFrom(ExecutorActor.RESTORE_BACKUP, RestoreBackupActor.RESTORE, fid); return ""; } } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ProcessingService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ProcessingService.java index abdbef17..0c3d4eb8 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ProcessingService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ProcessingService.java @@ -2,58 +2,54 @@ package nu.marginalia.executor.svc; import com.google.gson.Gson; import com.google.inject.Inject; -import nu.marginalia.actor.Actor; -import nu.marginalia.actor.ActorControlService; +import nu.marginalia.actor.ExecutorActor; +import nu.marginalia.actor.ExecutorActorControlService; import nu.marginalia.actor.task.ConvertActor; import nu.marginalia.actor.task.ConvertAndLoadActor; import nu.marginalia.actor.task.CrawlJobExtractorActor; import nu.marginalia.actor.task.RecrawlActor; import nu.marginalia.storage.model.FileStorageId; -import nu.marginalia.executor.model.crawl.RecrawlParameters; import nu.marginalia.executor.model.load.LoadParameters; import spark.Request; import spark.Response; public class ProcessingService { - private final ActorControlService actorControlService; + private final ExecutorActorControlService actorControlService; private final Gson gson; @Inject - public ProcessingService(ActorControlService actorControlService, + public ProcessingService(ExecutorActorControlService actorControlService, Gson gson) { this.actorControlService = actorControlService; this.gson = gson; } public Object startRecrawl(Request request, Response response) throws Exception { - var params = gson.fromJson(request.body(), RecrawlParameters.class); + var crawlId = gson.fromJson(request.body(), FileStorageId.class); actorControlService.start( - Actor.RECRAWL, - RecrawlActor.recrawlFromCrawlDataAndCralSpec( - params.crawlDataId(), - params.crawlSpecIds() - ) + ExecutorActor.RECRAWL, + RecrawlActor.recrawlFromCrawlDataAndCrawlSpec(crawlId) ); return ""; } public Object startCrawl(Request request, Response response) throws Exception { - actorControlService.start(Actor.CRAWL, FileStorageId.parse(request.params("fid"))); + actorControlService.start(ExecutorActor.CRAWL, FileStorageId.parse(request.params("fid"))); return ""; } public Object startConversion(Request request, Response response) throws Exception { - actorControlService.startFrom(Actor.CONVERT, ConvertActor.CONVERT, FileStorageId.parse(request.params("fid"))); + actorControlService.startFrom(ExecutorActor.CONVERT, ConvertActor.CONVERT, FileStorageId.parse(request.params("fid"))); return ""; } public Object startConvertLoad(Request request, Response response) throws Exception { actorControlService.start( - Actor.CONVERT_AND_LOAD, + ExecutorActor.CONVERT_AND_LOAD, FileStorageId.parse(request.params("fid")) ); return ""; @@ -65,7 +61,7 @@ public class ProcessingService { // Start the FSM from the intermediate state that triggers the load actorControlService.startFrom( - Actor.CONVERT_AND_LOAD, + ExecutorActor.CONVERT_AND_LOAD, ConvertAndLoadActor.LOAD, new ConvertAndLoadActor.Message(null, params.ids(), 0L, @@ -76,20 +72,12 @@ public class ProcessingService { } public Object startAdjacencyCalculation(Request request, Response response) throws Exception { - actorControlService.start(Actor.ADJACENCY_CALCULATION); - return ""; - } - - public Object createCrawlSpecFromDb(Request request, Response response) throws Exception { - actorControlService.startFrom(Actor.CRAWL_JOB_EXTRACTOR, CrawlJobExtractorActor.CREATE_FROM_DB, - new CrawlJobExtractorActor.CrawlJobExtractorArguments( - request.queryParamOrDefault("description", "")) - ); + actorControlService.start(ExecutorActor.ADJACENCY_CALCULATION); return ""; } public Object createCrawlSpecFromDownload(Request request, Response response) throws Exception { - actorControlService.startFrom(Actor.CRAWL_JOB_EXTRACTOR, CrawlJobExtractorActor.CREATE_FROM_LINK, + actorControlService.startFrom(ExecutorActor.CRAWL_JOB_EXTRACTOR, CrawlJobExtractorActor.CREATE_FROM_LINK, new CrawlJobExtractorActor.CrawlJobExtractorArgumentsWithURL( request.queryParamOrDefault("description", ""), request.queryParamOrDefault("url", "")) diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java index fb09f460..ec223ed9 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java @@ -1,32 +1,32 @@ package nu.marginalia.executor.svc; import com.google.inject.Inject; -import nu.marginalia.actor.Actor; -import nu.marginalia.actor.ActorControlService; +import nu.marginalia.actor.ExecutorActor; +import nu.marginalia.actor.ExecutorActorControlService; import nu.marginalia.actor.task.ConvertActor; import spark.Request; import spark.Response; public class SideloadService { - private final ActorControlService actorControlService; + private final ExecutorActorControlService actorControlService; @Inject - public SideloadService(ActorControlService actorControlService) { + public SideloadService(ExecutorActorControlService actorControlService) { this.actorControlService = actorControlService; } public Object sideloadDirtree(Request request, Response response) throws Exception { - actorControlService.startFrom(Actor.CONVERT, ConvertActor.CONVERT_DIRTREE, request.queryParams("path")); + actorControlService.startFrom(ExecutorActor.CONVERT, ConvertActor.CONVERT_DIRTREE, request.queryParams("path")); return ""; } public Object sideloadEncyclopedia(Request request, Response response) throws Exception { - actorControlService.startFrom(Actor.CONVERT, ConvertActor.CONVERT_ENCYCLOPEDIA, request.queryParams("path")); + actorControlService.startFrom(ExecutorActor.CONVERT, ConvertActor.CONVERT_ENCYCLOPEDIA, request.queryParams("path")); return ""; } public Object sideloadStackexchange(Request request, Response response) throws Exception { - actorControlService.startFrom(Actor.CONVERT, ConvertActor.CONVERT_STACKEXCHANGE, request.queryParams("path")); + actorControlService.startFrom(ExecutorActor.CONVERT, ConvertActor.CONVERT_STACKEXCHANGE, request.queryParams("path")); return ""; } } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java new file mode 100644 index 00000000..ca9e90e2 --- /dev/null +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java @@ -0,0 +1,172 @@ +package nu.marginalia.executor.svc; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.zaxxer.hikari.HikariDataSource; +import lombok.SneakyThrows; +import nu.marginalia.executor.model.transfer.TransferItem; +import nu.marginalia.executor.model.transfer.TransferSpec; +import nu.marginalia.executor.storage.FileStorageContent; +import nu.marginalia.executor.storage.FileStorageFile; +import nu.marginalia.process.log.WorkLog; +import nu.marginalia.service.module.ServiceConfiguration; +import nu.marginalia.storage.FileStorageService; +import nu.marginalia.storage.model.FileStorageId; +import nu.marginalia.storage.model.FileStorageType; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import spark.Request; +import spark.Response; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +public class TransferService { + private final Gson gson; + private final FileStorageService fileStorageService; + private final HikariDataSource dataSource; + private final int nodeId; + + private static final Logger logger = LoggerFactory.getLogger(TransferService.class); + @Inject + public TransferService( + Gson gson, + FileStorageService fileStorageService, + HikariDataSource dataSource, + ServiceConfiguration config) + { + this.gson = gson; + this.fileStorageService = fileStorageService; + this.dataSource = dataSource; + this.nodeId = config.node(); + } + + public Object transferFile(Request request, Response response) throws SQLException, IOException { + FileStorageId fileStorageId = FileStorageId.parse(request.params("fid")); + + var fileStorage = fileStorageService.getStorage(fileStorageId); + + Path basePath = fileStorage.asPath(); + // This is not a public API so injection isn't a concern + Path filePath = basePath.resolve(request.queryParams("path")); + + response.type("application/octet-stream"); + FileUtils.copyFile(filePath.toFile(), response.raw().getOutputStream()); + return ""; + } + + + public FileStorageContent listFiles(Request request, Response response) throws SQLException, IOException { + FileStorageId fileStorageId = FileStorageId.parse(request.params("fid")); + + var storage = fileStorageService.getStorage(fileStorageId); + + List files; + + try (var fs = Files.list(storage.asPath())) { + files = fs.filter(Files::isRegularFile) + .map(this::createFileModel) + .sorted(Comparator.comparing(FileStorageFile::name)) + .toList(); + } + + return new FileStorageContent(files); + } + + @SneakyThrows + private FileStorageFile createFileModel(Path path) { + return new FileStorageFile( + path.toFile().getName(), + Files.size(path), + Files.getLastModifiedTime(path).toInstant().toString() + ); + } + + public TransferSpec getTransferSpec(Request request, Response response) throws SQLException { + List fileStorageIds = fileStorageService.getActiveFileStorages(nodeId, FileStorageType.CRAWL_DATA); + if (fileStorageIds.isEmpty()) { + logger.warn("No ACTIVE crawl data"); + return new TransferSpec(); + } + int count = Integer.parseInt(request.queryParams("count")); + + logger.info("Preparing a transfer of {} domains", count); + + List items = new ArrayList<>(); + var storage = fileStorageService.getStorage(fileStorageIds.get(0)); + + try (var conn = dataSource.getConnection(); + var query = conn.prepareStatement("SELECT ID FROM EC_DOMAIN WHERE DOMAIN_NAME=? AND NODE_AFFINITY=" + nodeId) + ) { + for (var item : WorkLog.iterable(storage.asPath().resolve("crawler.log"))) { + if (items.size() >= count) + break; + + if (!Files.isRegularFile(storage.asPath().resolve(item.relPath()))) { + logger.info("Ignoring absent item {}", item); + continue; + } + + query.setString(1, item.id()); + var rs = query.executeQuery(); + if (rs.next()) { + items.add(new TransferItem( + item.id(), + rs.getInt(1), + fileStorageIds.get(0), + item.relPath() + )); + } + else { + logger.info("Rejected {}", item); + } + } + } + + logger.info("Found {} eligible domains", items.size()); + + return new TransferSpec(items); + } + + public Object yieldDomain(Request request, Response response) throws SQLException, IOException { + var item = gson.fromJson(request.body(), TransferItem.class); + var storage = fileStorageService.getStorage(item.fileStorageId()); + Files.delete(storage.asPath().resolve(storage.asPath().resolve(item.path()))); + return ""; + } + + public void pruneCrawlDataMqEndpoint() throws SQLException, IOException { + List fileStorageIds = fileStorageService.getActiveFileStorages(nodeId, FileStorageType.CRAWL_DATA); + if (fileStorageIds.isEmpty()) { + return; + } + var storage = fileStorageService.getStorage(fileStorageIds.get(0)); + + Path newCrawlLogPath = storage.asPath().resolve("crawler.log-new"); + Path oldCrawlLogPath = storage.asPath().resolve("crawler.log"); + + int pruned = 0; + try (var newWorkLog = new WorkLog(newCrawlLogPath)) { + for (var item : WorkLog.iterable(oldCrawlLogPath)) { + if (Files.exists(storage.asPath().resolve(item.relPath()))) { + newWorkLog.setJobToFinished(item.id(), item.path(), item.cnt()); + } + else { + pruned++; + } + } + } + if (pruned > 0) { + logger.info("Pruned {} items from the crawl log!", pruned); + } + + Files.move(newCrawlLogPath, oldCrawlLogPath, StandardCopyOption.REPLACE_EXISTING); + } +} diff --git a/code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java b/code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java index 63d1f87c..de12a52d 100644 --- a/code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java +++ b/code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java @@ -4,8 +4,8 @@ import com.google.inject.AbstractModule; import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Provides; -import nu.marginalia.actor.Actor; -import nu.marginalia.actor.ActorControlService; +import nu.marginalia.actor.ExecutorActor; +import nu.marginalia.actor.ExecutorActorControlService; import nu.marginalia.actor.task.CrawlJobExtractorActor; import nu.marginalia.client.Context; import nu.marginalia.client.route.RouteProvider; @@ -14,7 +14,6 @@ import nu.marginalia.process.ProcessService; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorageId; import nu.marginalia.executor.client.ExecutorClient; -import nu.marginalia.executor.model.crawl.RecrawlParameters; import nu.marginalia.index.client.IndexClient; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.inbox.MqAsynchronousInbox; @@ -69,43 +68,43 @@ public class ExecutorSvcApiIntegrationTest { @Test public void startStartActor() throws Exception { testInstances.client.startFsm(Context.internal(), 0, "crawl"); - Mockito.verify(testInstances.actorControlService).startJSON(Actor.CRAWL, "\"\""); + Mockito.verify(testInstances.actorControlService).startJSON(ExecutorActor.CRAWL, "\"\""); } @Test public void startStopActor() { testInstances.client.stopFsm(Context.internal(), 0, "crawl"); - Mockito.verify(testInstances.actorControlService).stop(Actor.CRAWL); + Mockito.verify(testInstances.actorControlService).stop(ExecutorActor.CRAWL); } @Test public void triggerCrawl() throws Exception { testInstances.client.triggerCrawl(Context.internal(), 0, "1"); - Mockito.verify(testInstances.actorControlService).start(eq(Actor.CRAWL), any()); + Mockito.verify(testInstances.actorControlService).start(eq(ExecutorActor.CRAWL), any()); } @Test public void triggerRecrawl() throws Exception { testInstances.client.triggerRecrawl(Context.internal(), 0, - new RecrawlParameters(new FileStorageId(0), List.of())); + new FileStorageId(0)); - Mockito.verify(testInstances.actorControlService).start(eq(Actor.RECRAWL), any()); + Mockito.verify(testInstances.actorControlService).start(eq(ExecutorActor.RECRAWL), any()); } @Test public void triggerConvert() throws Exception { testInstances.client.triggerConvert(Context.internal(), 0, "1"); - Mockito.verify(testInstances.actorControlService).startFrom(eq(Actor.CONVERT), eq("CONVERT"), any()); + Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT), eq("CONVERT"), any()); } @Test public void triggerProcessAndLoad() throws Exception { testInstances.client.triggerProcessAndLoad(Context.internal(), 0, "1"); - Mockito.verify(testInstances.actorControlService).start(eq(Actor.CONVERT_AND_LOAD), any()); + Mockito.verify(testInstances.actorControlService).start(eq(ExecutorActor.CONVERT_AND_LOAD), any()); } @Test @@ -113,7 +112,7 @@ public class ExecutorSvcApiIntegrationTest { testInstances.client.loadProcessedData(Context.internal(), 0, "1"); Mockito.verify(testInstances.actorControlService).startFrom( - eq(Actor.CONVERT_AND_LOAD), + eq(ExecutorActor.CONVERT_AND_LOAD), eq("LOAD"), any()); } @@ -122,45 +121,39 @@ public class ExecutorSvcApiIntegrationTest { public void calculateAdjacencies() throws Exception { testInstances.client.calculateAdjacencies(Context.internal(), 0); - Mockito.verify(testInstances.actorControlService).start(eq(Actor.ADJACENCY_CALCULATION)); + Mockito.verify(testInstances.actorControlService).start(eq(ExecutorActor.ADJACENCY_CALCULATION)); } @Test public void sideloadDirtree() throws Exception { testInstances.client.sideloadDirtree(Context.internal(), 0, Path.of("/tmp/test")); - Mockito.verify(testInstances.actorControlService).startFrom(eq(Actor.CONVERT), eq("CONVERT_DIRTREE"), eq("/tmp/test")); + Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT), eq("CONVERT_DIRTREE"), eq("/tmp/test")); } @Test public void sideloadEncyclopedia() throws Exception { testInstances.client.sideloadEncyclopedia(Context.internal(), 0, Path.of("/tmp/test")); - Mockito.verify(testInstances.actorControlService).startFrom(eq(Actor.CONVERT), eq("CONVERT_ENCYCLOPEDIA"), eq("/tmp/test")); + Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT), eq("CONVERT_ENCYCLOPEDIA"), eq("/tmp/test")); } @Test public void sideloadStackexchange() throws Exception { testInstances.client.sideloadStackexchange(Context.internal(), 0, Path.of("/tmp/test")); - Mockito.verify(testInstances.actorControlService).startFrom(eq(Actor.CONVERT), eq("CONVERT_STACKEXCHANGE"), eq("/tmp/test")); - } - - @Test - public void testCreateCrawlSpecFromDb() throws Exception { - testInstances.client.createCrawlSpecFromDb(Context.internal(), 0, "Lorem Ipsum"); - Mockito.verify(testInstances.actorControlService).startFrom(eq(Actor.CRAWL_JOB_EXTRACTOR), eq("CREATE_FROM_DB"), eq(new CrawlJobExtractorActor.CrawlJobExtractorArguments("Lorem Ipsum"))); + Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT), eq("CONVERT_STACKEXCHANGE"), eq("/tmp/test")); } @Test public void testCreateCrawlSpecFromUrl() throws Exception { testInstances.client.createCrawlSpecFromDownload(Context.internal(), 0, "Lorem Ipsum", "http://www.example.com"); - Mockito.verify(testInstances.actorControlService).startFrom(eq(Actor.CRAWL_JOB_EXTRACTOR), eq("CREATE_FROM_LINK"), eq(new CrawlJobExtractorActor.CrawlJobExtractorArgumentsWithURL("Lorem Ipsum", "http://www.example.com"))); + Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CRAWL_JOB_EXTRACTOR), eq("CREATE_FROM_LINK"), eq(new CrawlJobExtractorActor.CrawlJobExtractorArgumentsWithURL("Lorem Ipsum", "http://www.example.com"))); } @Test public void backupRestore() throws Exception { testInstances.client.restoreBackup(Context.internal(), 0, "1"); - Mockito.verify(testInstances.actorControlService).startFrom(eq(Actor.RESTORE_BACKUP), eq("RESTORE"), eq(new FileStorageId(1))); + Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.RESTORE_BACKUP), eq("RESTORE"), eq(new FileStorageId(1))); } } @@ -170,14 +163,14 @@ class TestInstances { @Inject ExecutorClient client; @Inject - ActorControlService actorControlService; + ExecutorActorControlService actorControlService; } class TestModule extends AbstractModule { @Override public void configure() { System.setProperty("service-name", "test"); - bind(ActorControlService.class).toInstance(Mockito.mock(ActorControlService.class)); + bind(ExecutorActorControlService.class).toInstance(Mockito.mock(ExecutorActorControlService.class)); bind(FileStorageService.class).toInstance(Mockito.mock(FileStorageService.class)); bind(ProcessService.class).toInstance(Mockito.mock(ProcessService.class)); bind(ServiceEventLog.class).toInstance(Mockito.mock(ServiceEventLog.class)); diff --git a/run/readme.md b/run/readme.md index d9f5ee7d..a4523e9e 100644 --- a/run/readme.md +++ b/run/readme.md @@ -66,7 +66,7 @@ data from https://downloads.marginalia.nu/ and extract it to the correct locatio The system will pick the data up automatically. ```shell -$ run/download-samples l +$ run/download-samples.sh l ``` Four sets are available: