From 460998d51292e42343741aed9182037f24ef1e4a Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Fri, 25 Aug 2023 12:52:54 +0200 Subject: [PATCH] (index) Move index construction to separate process. This provides a much cleaner separation of concerns, and makes it possible to get rid of a lot of the gunkier parts of the index service. It will also permit lowering the Xmx on the index service a fair bit, so we can get CompressedOOps again :D --- build.gradle | 4 + .../marginalia/mqapi/ProcessInboxNames.java | 2 + .../mqapi/index/CreateIndexRequest.java | 5 + .../nu/marginalia/mqapi/index/IndexName.java | 7 + .../process/control/ProcessHeartbeat.java | 9 + .../process/control/ProcessTaskHeartbeat.java | 190 +++++++++++++++ .../features-index/index-forward/build.gradle | 2 +- .../index/forward/ForwardIndexConverter.java | 10 +- .../index/forward/ForwardIndexFileNames.java | 28 +++ .../forward/ForwardIndexConverterTest.java | 10 +- .../index/journal/IndexJournalFileNames.java | 9 + .../features-index/index-reverse/build.gradle | 2 +- .../index/full/ReverseIndexFullConverter.java | 9 +- .../index/full/ReverseIndexFullFileNames.java | 28 +++ .../priority/ReverseIndexPrioFileNames.java | 28 +++ .../ReverseIndexPriorityConverter.java | 8 +- .../ReverseIndexFullConverterTest.java | 12 +- .../ReverseIndexFullConverterTest2.java | 22 +- .../ReverseIndexPriorityConverterTest2.java | 23 +- .../index-constructor-process/build.gradle | 57 +++++ .../index/IndexConstructorMain.java | 214 ++++++++++++++++ .../index/IndexConstructorModule.java | 14 ++ .../loader/LoaderIndexJournalWriter.java | 3 +- .../nu/marginalia/control/actor/Actor.java | 2 +- .../control/actor/ControlActors.java | 2 + .../monitor/AbstractProcessSpawnerActor.java | 6 +- .../monitor/IndexConstructorMonitorActor.java | 22 ++ .../actor/task/ConvertAndLoadActor.java | 102 ++++++-- .../control/model/ProcessHeartbeat.java | 1 + .../control/process/ProcessOutboxes.java | 8 + .../control/process/ProcessService.java | 1 + .../nu/marginalia/index/IndexService.java | 29 +-- .../index/IndexServicesFactory.java | 228 +++--------------- .../marginalia/index/index/SearchIndex.java | 11 +- gradle/wrapper/gradle-wrapper.properties | 2 +- settings.gradle | 1 + 36 files changed, 809 insertions(+), 302 deletions(-) create mode 100644 code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/index/CreateIndexRequest.java create mode 100644 code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/index/IndexName.java create mode 100644 code/common/process/src/main/java/nu/marginalia/process/control/ProcessTaskHeartbeat.java create mode 100644 code/features-index/index-forward/src/main/java/nu/marginalia/index/forward/ForwardIndexFileNames.java create mode 100644 code/features-index/index-journal/src/main/java/nu/marginallia/index/journal/IndexJournalFileNames.java create mode 100644 code/features-index/index-reverse/src/main/java/nu/marginalia/index/full/ReverseIndexFullFileNames.java create mode 100644 code/features-index/index-reverse/src/main/java/nu/marginalia/index/priority/ReverseIndexPrioFileNames.java create mode 100644 code/processes/index-constructor-process/build.gradle create mode 100644 code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java create mode 100644 code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorModule.java create mode 100644 code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/IndexConstructorMonitorActor.java diff --git a/build.gradle b/build.gradle index 11ef88b5..70984557 100644 --- a/build.gradle +++ b/build.gradle @@ -34,6 +34,10 @@ tasks.register('dist', Copy) { from tarTree("$buildDir/dist/crawl-job-extractor-process.tar") into "$projectDir/run/dist/" } + copy { + from tarTree("$buildDir/dist/index-construction-process.tar") + into "$projectDir/run/dist/" + } } } idea { diff --git a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/ProcessInboxNames.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/ProcessInboxNames.java index 9ca91fe6..b7906c22 100644 --- a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/ProcessInboxNames.java +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/ProcessInboxNames.java @@ -4,4 +4,6 @@ public class ProcessInboxNames { public static final String CONVERTER_INBOX = "converter"; public static final String LOADER_INBOX = "loader"; public static final String CRAWLER_INBOX = "crawler"; + + public static final String INDEX_CONSTRUCTOR_INBOX = "index_constructor"; } diff --git a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/index/CreateIndexRequest.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/index/CreateIndexRequest.java new file mode 100644 index 00000000..c0ab45a1 --- /dev/null +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/index/CreateIndexRequest.java @@ -0,0 +1,5 @@ +package nu.marginalia.mqapi.index; + +public record CreateIndexRequest(IndexName indexName) +{ +} diff --git a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/index/IndexName.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/index/IndexName.java new file mode 100644 index 00000000..c7925e50 --- /dev/null +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/index/IndexName.java @@ -0,0 +1,7 @@ +package nu.marginalia.mqapi.index; + +public enum IndexName { + FORWARD, + REVERSE_FULL, + REVERSE_PRIO +} diff --git a/code/common/process/src/main/java/nu/marginalia/process/control/ProcessHeartbeat.java b/code/common/process/src/main/java/nu/marginalia/process/control/ProcessHeartbeat.java index 82b2c95e..b683715d 100644 --- a/code/common/process/src/main/java/nu/marginalia/process/control/ProcessHeartbeat.java +++ b/code/common/process/src/main/java/nu/marginalia/process/control/ProcessHeartbeat.java @@ -19,6 +19,8 @@ public class ProcessHeartbeat { private final String processName; private final String processBase; private final String instanceUUID; + @org.jetbrains.annotations.NotNull + private final ProcessConfiguration configuration; private final HikariDataSource dataSource; @@ -35,6 +37,7 @@ public class ProcessHeartbeat { { this.processName = configuration.processName() + ":" + configuration.node(); this.processBase = configuration.processName(); + this.configuration = configuration; this.dataSource = dataSource; this.instanceUUID = configuration.instanceUuid().toString(); @@ -44,6 +47,12 @@ public class ProcessHeartbeat { Runtime.getRuntime().addShutdownHook(new Thread(this::shutDown)); } + + public > ProcessTaskHeartbeat createProcessTaskHeartbeat(Class steps, String processName) { + return new ProcessTaskHeartbeat<>(steps, configuration, processName, dataSource); + } + + public void setProgress(double progress) { this.progress = (int) (progress * 100); } diff --git a/code/common/process/src/main/java/nu/marginalia/process/control/ProcessTaskHeartbeat.java b/code/common/process/src/main/java/nu/marginalia/process/control/ProcessTaskHeartbeat.java new file mode 100644 index 00000000..353307e9 --- /dev/null +++ b/code/common/process/src/main/java/nu/marginalia/process/control/ProcessTaskHeartbeat.java @@ -0,0 +1,190 @@ +package nu.marginalia.process.control; + + +import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.ProcessConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** This object sends a heartbeat to the database every few seconds, + * updating with the progress of a task within a service. Progress is tracked by providing + * enumerations corresponding to the steps in the task. It's important they're arranged in the same + * order as the steps in the task in order to get an accurate progress tracking. + */ +public class ProcessTaskHeartbeat> implements AutoCloseable { + private final Logger logger = LoggerFactory.getLogger(ProcessTaskHeartbeat.class); + private final String taskName; + private final String taskBase; + private final String instanceUUID; + private final HikariDataSource dataSource; + + + private final Thread runnerThread; + private final int heartbeatInterval = Integer.getInteger("mcp.heartbeat.interval", 1); + private final String serviceInstanceUUID; + private final int stepCount; + + private volatile boolean running = false; + private volatile int stepNum = 0; + private volatile String step = "-"; + + ProcessTaskHeartbeat(Class stepClass, + ProcessConfiguration configuration, + String taskName, + HikariDataSource dataSource) + { + this.taskName = configuration.processName() + "." + taskName + ":" + configuration.node(); + this.taskBase = configuration.processName() + "." + taskName; + this.dataSource = dataSource; + + this.instanceUUID = UUID.randomUUID().toString(); + this.serviceInstanceUUID = configuration.instanceUuid().toString(); + + this.stepCount = stepClass.getEnumConstants().length; + + heartbeatInit(); + + runnerThread = new Thread(this::run); + runnerThread.start(); + } + + /** Update the progress of the task. This is a fast function that doesn't block; + * the actual update is done in a separate thread. + * + * @param step The current step in the task. + */ + public void progress(T step) { + this.step = step.name(); + + + // off by one since we calculate the progress based on the number of steps, + // and Enum.ordinal() is zero-based (so the 5th step in a 5 step task is 4, not 5; resulting in the + // final progress being 80% and not 100%) + + this.stepNum = 1 + step.ordinal(); + + logger.info("ProcessTask {} progress: {}", taskBase, step.name()); + } + + public void shutDown() { + if (!running) + return; + + running = false; + + try { + runnerThread.join(); + heartbeatStop(); + } + catch (InterruptedException|SQLException ex) { + logger.warn("ProcessHeartbeat shutdown failed", ex); + } + } + + private void run() { + if (!running) + running = true; + else + return; + + try { + while (running) { + try { + heartbeatUpdate(); + } + catch (SQLException ex) { + logger.warn("ProcessHeartbeat failed to update", ex); + } + + TimeUnit.SECONDS.sleep(heartbeatInterval); + } + } + catch (InterruptedException ex) { + logger.error("ProcessHeartbeat caught irrecoverable exception, killing service", ex); + System.exit(255); + } + } + + private void heartbeatInit() { + try (var connection = dataSource.getConnection()) { + try (var stmt = connection.prepareStatement( + """ + INSERT INTO TASK_HEARTBEAT (TASK_NAME, TASK_BASE, INSTANCE, SERVICE_INSTANCE, HEARTBEAT_TIME, STATUS) + VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP(6), 'STARTING') + ON DUPLICATE KEY UPDATE + INSTANCE = ?, + SERVICE_INSTANCE = ?, + HEARTBEAT_TIME = CURRENT_TIMESTAMP(6), + STATUS = 'STARTING' + """ + )) + { + stmt.setString(1, taskName); + stmt.setString(2, taskBase); + stmt.setString(3, instanceUUID); + stmt.setString(4, serviceInstanceUUID); + stmt.setString(5, instanceUUID); + stmt.setString(6, serviceInstanceUUID); + stmt.executeUpdate(); + } + } + catch (SQLException ex) { + logger.error("ProcessHeartbeat failed to initialize", ex); + throw new RuntimeException(ex); + } + + } + + private void heartbeatUpdate() throws SQLException { + try (var connection = dataSource.getConnection()) { + try (var stmt = connection.prepareStatement( + """ + UPDATE TASK_HEARTBEAT + SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6), + STATUS = 'RUNNING', + PROGRESS = ?, + STAGE_NAME = ? + WHERE INSTANCE = ? + """) + ) + { + stmt.setInt(1, (int) Math.round(100 * stepNum / (double) stepCount)); + stmt.setString(2, step); + stmt.setString(3, instanceUUID); + stmt.executeUpdate(); + } + } + } + + private void heartbeatStop() throws SQLException { + try (var connection = dataSource.getConnection()) { + try (var stmt = connection.prepareStatement( + """ + UPDATE TASK_HEARTBEAT + SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6), + STATUS='STOPPED', + PROGRESS = ?, + STAGE_NAME = ? + WHERE INSTANCE = ? + """) + ) + { + stmt.setInt(1, (int) Math.round(100 * stepNum / (double) stepCount)); + stmt.setString( 2, step); + stmt.setString( 3, instanceUUID); + stmt.executeUpdate(); + } + } + } + + @Override + public void close() { + shutDown(); + } + +} + diff --git a/code/features-index/index-forward/build.gradle b/code/features-index/index-forward/build.gradle index f55a02df..4dc08fbc 100644 --- a/code/features-index/index-forward/build.gradle +++ b/code/features-index/index-forward/build.gradle @@ -18,7 +18,7 @@ dependencies { implementation project(':code:features-index:index-journal') implementation project(':code:features-index:lexicon') implementation project(':code:common:model') - implementation project(':code:common:service') + implementation project(':code:common:process') implementation project(':third-party:uppend') diff --git a/code/features-index/index-forward/src/main/java/nu/marginalia/index/forward/ForwardIndexConverter.java b/code/features-index/index-forward/src/main/java/nu/marginalia/index/forward/ForwardIndexConverter.java index a5a750fe..42591dff 100644 --- a/code/features-index/index-forward/src/main/java/nu/marginalia/index/forward/ForwardIndexConverter.java +++ b/code/features-index/index-forward/src/main/java/nu/marginalia/index/forward/ForwardIndexConverter.java @@ -6,10 +6,8 @@ import nu.marginalia.index.journal.reader.IndexJournalReader; import nu.marginalia.array.LongArray; import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile; import nu.marginalia.model.idx.DocumentMetadata; +import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.ranking.DomainRankings; -import nu.marginalia.service.control.ServiceHeartbeat; -import org.roaringbitmap.IntConsumer; -import org.roaringbitmap.RoaringBitmap; import org.roaringbitmap.longlong.LongConsumer; import org.roaringbitmap.longlong.Roaring64Bitmap; import org.slf4j.Logger; @@ -22,7 +20,7 @@ import java.nio.file.Path; public class ForwardIndexConverter { - private final ServiceHeartbeat heartbeat; + private final ProcessHeartbeat heartbeat; private final File inputFile; private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -32,7 +30,7 @@ public class ForwardIndexConverter { private final DomainRankings domainRankings; - public ForwardIndexConverter(ServiceHeartbeat heartbeat, + public ForwardIndexConverter(ProcessHeartbeat heartbeat, File inputFile, Path outputFileDocsId, Path outputFileDocsData, @@ -66,7 +64,7 @@ public class ForwardIndexConverter { logger.info("Domain Rankings size = {}", domainRankings.size()); - try (var progress = heartbeat.createServiceTaskHeartbeat(TaskSteps.class, "forwardIndexConverter")) { + try (var progress = heartbeat.createProcessTaskHeartbeat(TaskSteps.class, "forwardIndexConverter")) { progress.progress(TaskSteps.GET_DOC_IDS); LongArray docsFileId = getDocIds(outputFileDocsId, journalReader); diff --git a/code/features-index/index-forward/src/main/java/nu/marginalia/index/forward/ForwardIndexFileNames.java b/code/features-index/index-forward/src/main/java/nu/marginalia/index/forward/ForwardIndexFileNames.java new file mode 100644 index 00000000..89cd0d6d --- /dev/null +++ b/code/features-index/index-forward/src/main/java/nu/marginalia/index/forward/ForwardIndexFileNames.java @@ -0,0 +1,28 @@ +package nu.marginalia.index.forward; + +import java.nio.file.Path; + +public class ForwardIndexFileNames { + public static Path resolve(Path basePath, FileIdentifier identifier, FileVersion version) { + return switch (identifier) { + case DOC_ID -> switch (version) { + case NEXT -> basePath.resolve("fwd-doc-id.dat.next"); + case CURRENT -> basePath.resolve("fwd-doc-id.dat"); + }; + case DOC_DATA -> switch (version) { + case NEXT -> basePath.resolve("fwd-doc-data.dat.next"); + case CURRENT -> basePath.resolve("fwd-doc-data.dat"); + }; + }; + } + + public enum FileVersion { + CURRENT, + NEXT + }; + + public enum FileIdentifier { + DOC_DATA, + DOC_ID + } +} diff --git a/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java b/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java index efa2ee92..1118841d 100644 --- a/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java +++ b/code/features-index/index-forward/src/test/java/nu/marginalia/index/forward/ForwardIndexConverterTest.java @@ -6,11 +6,11 @@ import nu.marginalia.index.journal.writer.IndexJournalWriterImpl; import nu.marginalia.index.journal.writer.IndexJournalWriter; import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode; import nu.marginalia.model.id.UrlIdCodec; +import nu.marginalia.process.control.ProcessHeartbeat; +import nu.marginalia.process.control.ProcessTaskHeartbeat; import nu.marginalia.ranking.DomainRankings; import nu.marginalia.lexicon.KeywordLexicon; import nu.marginalia.lexicon.journal.KeywordLexiconJournal; -import nu.marginalia.service.control.ServiceHeartbeat; -import nu.marginalia.service.control.ServiceTaskHeartbeat; import nu.marginalia.test.TestUtil; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -104,9 +104,9 @@ class ForwardIndexConverterTest { void testForwardIndex() throws IOException { // RIP fairies - var serviceHeartbeat = Mockito.mock(ServiceHeartbeat.class); - when(serviceHeartbeat.createServiceTaskHeartbeat(Mockito.any(), Mockito.any())) - .thenReturn(Mockito.mock(ServiceTaskHeartbeat.class)); + var serviceHeartbeat = Mockito.mock(ProcessHeartbeat.class); + when(serviceHeartbeat.createProcessTaskHeartbeat(Mockito.any(), Mockito.any())) + .thenReturn(Mockito.mock(ProcessTaskHeartbeat.class)); new ForwardIndexConverter(serviceHeartbeat, indexFile.toFile(), docsFileId, docsFileData, new DomainRankings()).convert(); diff --git a/code/features-index/index-journal/src/main/java/nu/marginallia/index/journal/IndexJournalFileNames.java b/code/features-index/index-journal/src/main/java/nu/marginallia/index/journal/IndexJournalFileNames.java new file mode 100644 index 00000000..22f38f09 --- /dev/null +++ b/code/features-index/index-journal/src/main/java/nu/marginallia/index/journal/IndexJournalFileNames.java @@ -0,0 +1,9 @@ +package nu.marginallia.index.journal; + +import java.nio.file.Path; + +public class IndexJournalFileNames { + public static Path resolve(Path base) { + return base.resolve("page-index.dat"); + } +} diff --git a/code/features-index/index-reverse/build.gradle b/code/features-index/index-reverse/build.gradle index 90bf9411..e7286e45 100644 --- a/code/features-index/index-reverse/build.gradle +++ b/code/features-index/index-reverse/build.gradle @@ -20,7 +20,7 @@ dependencies { implementation project(':code:features-index:index-journal') implementation project(':code:features-index:lexicon') implementation project(':code:common:model') - implementation project(':code:common:service') + implementation project(':code:common:process') implementation libs.lombok annotationProcessor libs.lombok diff --git a/code/features-index/index-reverse/src/main/java/nu/marginalia/index/full/ReverseIndexFullConverter.java b/code/features-index/index-reverse/src/main/java/nu/marginalia/index/full/ReverseIndexFullConverter.java index cc44c35b..1003da96 100644 --- a/code/features-index/index-reverse/src/main/java/nu/marginalia/index/full/ReverseIndexFullConverter.java +++ b/code/features-index/index-reverse/src/main/java/nu/marginalia/index/full/ReverseIndexFullConverter.java @@ -8,6 +8,7 @@ import nu.marginalia.index.journal.model.IndexJournalEntryData; import nu.marginalia.index.journal.model.IndexJournalStatistics; import nu.marginalia.index.journal.reader.IndexJournalReader; import nu.marginalia.model.id.UrlIdCodec; +import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.ranking.DomainRankings; import nu.marginalia.rwf.RandomWriteFunnel; import nu.marginalia.array.IntArray; @@ -22,14 +23,12 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; -import nu.marginalia.service.control.ServiceHeartbeat; - import static nu.marginalia.index.full.ReverseIndexFullParameters.bTreeContext; public class ReverseIndexFullConverter { private static final int RWF_BIN_SIZE = 10_000_000; - private final ServiceHeartbeat heartbeat; + private final ProcessHeartbeat heartbeat; private final Path tmpFileDir; private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -40,7 +39,7 @@ public class ReverseIndexFullConverter { private final Path outputFileDocs; private final SortingContext sortingContext; - public ReverseIndexFullConverter(ServiceHeartbeat heartbeat, + public ReverseIndexFullConverter(ProcessHeartbeat heartbeat, Path tmpFileDir, IndexJournalReader journalReader, DomainRankings domainRankings, @@ -77,7 +76,7 @@ public class ReverseIndexFullConverter { final Path intermediateUrlsFile = Files.createTempFile(tmpFileDir, "urls-sorted", ".dat"); - try (var progress = heartbeat.createServiceTaskHeartbeat(TaskSteps.class, "reverseIndexFullConverter")) { + try (var progress = heartbeat.createProcessTaskHeartbeat(TaskSteps.class, "reverseIndexFullConverter")) { progress.progress(TaskSteps.ACCUMULATE_STATISTICS); final IndexJournalStatistics statistics = journalReader.getStatistics(); diff --git a/code/features-index/index-reverse/src/main/java/nu/marginalia/index/full/ReverseIndexFullFileNames.java b/code/features-index/index-reverse/src/main/java/nu/marginalia/index/full/ReverseIndexFullFileNames.java new file mode 100644 index 00000000..c65e43af --- /dev/null +++ b/code/features-index/index-reverse/src/main/java/nu/marginalia/index/full/ReverseIndexFullFileNames.java @@ -0,0 +1,28 @@ +package nu.marginalia.index.full; + +import java.nio.file.Path; + +public class ReverseIndexFullFileNames { + public static Path resolve(Path basePath, FileIdentifier identifier, FileVersion version) { + return switch (identifier) { + case WORDS -> switch (version) { + case NEXT -> basePath.resolve("rev-words.dat.next"); + case CURRENT -> basePath.resolve("rev-words.dat"); + }; + case DOCS -> switch (version) { + case NEXT -> basePath.resolve("rev-docs.dat.next"); + case CURRENT -> basePath.resolve("rev-docs.dat"); + }; + }; + } + + public enum FileVersion { + CURRENT, + NEXT + }; + + public enum FileIdentifier { + WORDS, + DOCS + } +} diff --git a/code/features-index/index-reverse/src/main/java/nu/marginalia/index/priority/ReverseIndexPrioFileNames.java b/code/features-index/index-reverse/src/main/java/nu/marginalia/index/priority/ReverseIndexPrioFileNames.java new file mode 100644 index 00000000..72ac6200 --- /dev/null +++ b/code/features-index/index-reverse/src/main/java/nu/marginalia/index/priority/ReverseIndexPrioFileNames.java @@ -0,0 +1,28 @@ +package nu.marginalia.index.priority; + +import java.nio.file.Path; + +public class ReverseIndexPrioFileNames { + public static Path resolve(Path basePath, FileIdentifier identifier, FileVersion version) { + return switch (identifier) { + case WORDS -> switch (version) { + case NEXT -> basePath.resolve("rev-prio-words.dat.next"); + case CURRENT -> basePath.resolve("rev-prio-words.dat"); + }; + case DOCS -> switch (version) { + case NEXT -> basePath.resolve("rev-prio-docs.dat.next"); + case CURRENT -> basePath.resolve("rev-prio-docs.dat"); + }; + }; + } + + public enum FileVersion { + CURRENT, + NEXT + }; + + public enum FileIdentifier { + WORDS, + DOCS + } +} diff --git a/code/features-index/index-reverse/src/main/java/nu/marginalia/index/priority/ReverseIndexPriorityConverter.java b/code/features-index/index-reverse/src/main/java/nu/marginalia/index/priority/ReverseIndexPriorityConverter.java index 90368c5a..17ef6895 100644 --- a/code/features-index/index-reverse/src/main/java/nu/marginalia/index/priority/ReverseIndexPriorityConverter.java +++ b/code/features-index/index-reverse/src/main/java/nu/marginalia/index/priority/ReverseIndexPriorityConverter.java @@ -11,9 +11,9 @@ import nu.marginalia.index.journal.model.IndexJournalEntryData; import nu.marginalia.index.journal.model.IndexJournalStatistics; import nu.marginalia.index.journal.reader.IndexJournalReader; import nu.marginalia.model.id.UrlIdCodec; +import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.ranking.DomainRankings; import nu.marginalia.rwf.RandomWriteFunnel; -import nu.marginalia.service.control.ServiceHeartbeat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,7 +28,7 @@ import static nu.marginalia.index.priority.ReverseIndexPriorityParameters.bTreeC public class ReverseIndexPriorityConverter { private static final int RWF_BIN_SIZE = 10_000_000; - private final ServiceHeartbeat heartbeat; + private final ProcessHeartbeat heartbeat; private final Path tmpFileDir; private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -39,7 +39,7 @@ public class ReverseIndexPriorityConverter { private final Path outputFileDocs; private final SortingContext sortingContext; - public ReverseIndexPriorityConverter(ServiceHeartbeat heartbeat, + public ReverseIndexPriorityConverter(ProcessHeartbeat heartbeat, Path tmpFileDir, IndexJournalReader journalReader, DomainRankings domainRankings, @@ -76,7 +76,7 @@ public class ReverseIndexPriorityConverter { final Path intermediateUrlsFile = Files.createTempFile(tmpFileDir, "urls-sorted", ".dat"); - try (var progress = heartbeat.createServiceTaskHeartbeat(TaskSteps.class, "reverseIndexPriorityConverter")) { + try (var progress = heartbeat.createProcessTaskHeartbeat(TaskSteps.class, "reverseIndexPriorityConverter")) { progress.progress(TaskSteps.ACCUMULATE_STATISTICS); final IndexJournalStatistics statistics = journalReader.getStatistics(); diff --git a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest.java b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest.java index 49365e1a..0d20a54d 100644 --- a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest.java +++ b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest.java @@ -10,12 +10,12 @@ import nu.marginalia.index.journal.writer.IndexJournalWriterImpl; import nu.marginalia.index.journal.writer.IndexJournalWriter; import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode; import nu.marginalia.model.id.UrlIdCodec; +import nu.marginalia.process.control.ProcessHeartbeat; +import nu.marginalia.process.control.ProcessTaskHeartbeat; import nu.marginalia.ranking.DomainRankings; import nu.marginalia.lexicon.KeywordLexicon; import nu.marginalia.lexicon.journal.KeywordLexiconJournal; import nu.marginalia.model.idx.DocumentMetadata; -import nu.marginalia.service.control.ServiceHeartbeat; -import nu.marginalia.service.control.ServiceTaskHeartbeat; import nu.marginalia.test.TestUtil; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -90,12 +90,12 @@ class ReverseIndexFullConverterTest { var journalReader = new IndexJournalReaderSingleCompressedFile(indexFile); // RIP fairies - var serviceHeartbeat = Mockito.mock(ServiceHeartbeat.class); - when(serviceHeartbeat.createServiceTaskHeartbeat(Mockito.any(), Mockito.any())) - .thenReturn(Mockito.mock(ServiceTaskHeartbeat.class)); + var processHeartbeat = Mockito.mock(ProcessHeartbeat.class); + when(processHeartbeat.createProcessTaskHeartbeat(Mockito.any(), Mockito.any())) + .thenReturn(Mockito.mock(ProcessTaskHeartbeat.class)); new ReverseIndexFullConverter( - serviceHeartbeat, + processHeartbeat, tmpDir, journalReader, new DomainRankings(), wordsFile, docsFile) .convert(); diff --git a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest2.java b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest2.java index e3a9848c..35c606f7 100644 --- a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest2.java +++ b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexFullConverterTest2.java @@ -12,11 +12,11 @@ import nu.marginalia.index.journal.writer.IndexJournalWriter; import nu.marginalia.index.priority.ReverseIndexPriorityParameters; import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode; import nu.marginalia.model.id.UrlIdCodec; +import nu.marginalia.process.control.ProcessHeartbeat; +import nu.marginalia.process.control.ProcessTaskHeartbeat; import nu.marginalia.ranking.DomainRankings; import nu.marginalia.lexicon.KeywordLexicon; import nu.marginalia.lexicon.journal.KeywordLexiconJournal; -import nu.marginalia.service.control.ServiceHeartbeat; -import nu.marginalia.service.control.ServiceTaskHeartbeat; import nu.marginalia.test.TestUtil; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -123,12 +123,11 @@ class ReverseIndexFullConverterTest2 { Path tmpDir = Path.of("/tmp"); - // RIP fairies - var serviceHeartbeat = Mockito.mock(ServiceHeartbeat.class); - when(serviceHeartbeat.createServiceTaskHeartbeat(Mockito.any(), Mockito.any())) - .thenReturn(Mockito.mock(ServiceTaskHeartbeat.class)); + var processHeartbeat = Mockito.mock(ProcessHeartbeat.class); + when(processHeartbeat.createProcessTaskHeartbeat(Mockito.any(), Mockito.any())) + .thenReturn(Mockito.mock(ProcessTaskHeartbeat.class)); - new ReverseIndexFullConverter(serviceHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile), new DomainRankings(), wordsFile, docsFile).convert(); + new ReverseIndexFullConverter(processHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile), new DomainRankings(), wordsFile, docsFile).convert(); var reverseReader = new ReverseIndexFullReader(wordsFile, docsFile); @@ -153,12 +152,11 @@ class ReverseIndexFullConverterTest2 { Path tmpDir = Path.of("/tmp"); - // RIP fairies - var serviceHeartbeat = Mockito.mock(ServiceHeartbeat.class); - when(serviceHeartbeat.createServiceTaskHeartbeat(Mockito.any(), Mockito.any())) - .thenReturn(Mockito.mock(ServiceTaskHeartbeat.class)); + var processHeartbeat = Mockito.mock(ProcessHeartbeat.class); + when(processHeartbeat.createProcessTaskHeartbeat(Mockito.any(), Mockito.any())) + .thenReturn(Mockito.mock(ProcessTaskHeartbeat.class)); - new ReverseIndexFullConverter(serviceHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile, null, ReverseIndexPriorityParameters::filterPriorityRecord), new DomainRankings(), wordsFile, docsFile).convert(); + new ReverseIndexFullConverter(processHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile, null, ReverseIndexPriorityParameters::filterPriorityRecord), new DomainRankings(), wordsFile, docsFile).convert(); var reverseReader = new ReverseIndexFullReader(wordsFile, docsFile); diff --git a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexPriorityConverterTest2.java b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexPriorityConverterTest2.java index 12e72f99..d85863f2 100644 --- a/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexPriorityConverterTest2.java +++ b/code/features-index/index-reverse/src/test/java/nu/marginalia/index/reverse/ReverseIndexPriorityConverterTest2.java @@ -14,9 +14,9 @@ import nu.marginalia.lexicon.KeywordLexicon; import nu.marginalia.lexicon.journal.KeywordLexiconJournal; import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode; import nu.marginalia.model.id.UrlIdCodec; +import nu.marginalia.process.control.ProcessHeartbeat; +import nu.marginalia.process.control.ProcessTaskHeartbeat; import nu.marginalia.ranking.DomainRankings; -import nu.marginalia.service.control.ServiceHeartbeat; -import nu.marginalia.service.control.ServiceTaskHeartbeat; import nu.marginalia.test.TestUtil; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -123,12 +123,11 @@ class ReverseIndexPriorityConverterTest2 { Path tmpDir = Path.of("/tmp"); - // RIP fairies - var serviceHeartbeat = Mockito.mock(ServiceHeartbeat.class); - when(serviceHeartbeat.createServiceTaskHeartbeat(Mockito.any(), Mockito.any())) - .thenReturn(Mockito.mock(ServiceTaskHeartbeat.class)); + var processHeartbeat = Mockito.mock(ProcessHeartbeat.class); + when(processHeartbeat.createProcessTaskHeartbeat(Mockito.any(), Mockito.any())) + .thenReturn(Mockito.mock(ProcessTaskHeartbeat.class)); - new ReverseIndexPriorityConverter(serviceHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile), new DomainRankings(), wordsFile, docsFile).convert(); + new ReverseIndexPriorityConverter(processHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile), new DomainRankings(), wordsFile, docsFile).convert(); var reverseReader = new ReverseIndexPriorityReader(wordsFile, docsFile); @@ -153,12 +152,12 @@ class ReverseIndexPriorityConverterTest2 { Path tmpDir = Path.of("/tmp"); - // RIP fairies - var serviceHeartbeat = Mockito.mock(ServiceHeartbeat.class); - when(serviceHeartbeat.createServiceTaskHeartbeat(Mockito.any(), Mockito.any())) - .thenReturn(Mockito.mock(ServiceTaskHeartbeat.class)); - new ReverseIndexPriorityConverter(serviceHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile, null, ReverseIndexPriorityParameters::filterPriorityRecord), new DomainRankings(), wordsFile, docsFile).convert(); + var processHeartbeat = Mockito.mock(ProcessHeartbeat.class); + when(processHeartbeat.createProcessTaskHeartbeat(Mockito.any(), Mockito.any())) + .thenReturn(Mockito.mock(ProcessTaskHeartbeat.class)); + + new ReverseIndexPriorityConverter(processHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile, null, ReverseIndexPriorityParameters::filterPriorityRecord), new DomainRankings(), wordsFile, docsFile).convert(); var reverseReader = new ReverseIndexPriorityReader(wordsFile, docsFile); diff --git a/code/processes/index-constructor-process/build.gradle b/code/processes/index-constructor-process/build.gradle new file mode 100644 index 00000000..b96b245f --- /dev/null +++ b/code/processes/index-constructor-process/build.gradle @@ -0,0 +1,57 @@ +plugins { + id 'java' + id "io.freefair.lombok" version "8.2.2" + id 'application' + id 'jvm-test-suite' +} + +java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(20)) + } +} + +application { + mainClass = 'nu.marginalia.index.IndexConstructorMain' + applicationName = 'index-construction-process' +} + +tasks.distZip.enabled = false + +dependencies { + implementation project(':code:api:process-mqapi') + implementation project(':code:common:process') + implementation project(':code:common:service') + implementation project(':code:common:db') + implementation project(':code:common:model') + implementation project(':code:libraries:message-queue') + + implementation project(':code:features-index:index-forward') + implementation project(':code:features-index:index-reverse') + implementation project(':code:features-index:index-journal') + implementation project(':code:features-index:domain-ranking') + + implementation libs.lombok + annotationProcessor libs.lombok + + implementation libs.bundles.slf4j + implementation libs.guice + implementation libs.bundles.mariadb + implementation libs.bundles.gson + + testImplementation libs.bundles.slf4j.test + testImplementation libs.bundles.junit + testImplementation libs.mockito + + testImplementation project(':code:processes:test-data') +} + +test { + useJUnitPlatform() +} + +task fastTests(type: Test) { + useJUnitPlatform { + excludeTags "slow" + } +} diff --git a/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java b/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java new file mode 100644 index 00000000..b520e487 --- /dev/null +++ b/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorMain.java @@ -0,0 +1,214 @@ +package nu.marginalia.index; + +import com.google.gson.Gson; +import com.google.inject.Guice; +import com.google.inject.Inject; +import nu.marginalia.db.storage.FileStorageService; +import nu.marginalia.db.storage.model.FileStorage; +import nu.marginalia.db.storage.model.FileStorageType; +import nu.marginalia.index.forward.ForwardIndexConverter; +import nu.marginalia.index.forward.ForwardIndexFileNames; +import nu.marginalia.index.full.ReverseIndexFullConverter; +import nu.marginalia.index.full.ReverseIndexFullFileNames; +import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile; +import nu.marginalia.index.priority.ReverseIndexPrioFileNames; +import nu.marginalia.index.priority.ReverseIndexPriorityConverter; +import nu.marginalia.model.gson.GsonFactory; +import nu.marginalia.mq.MessageQueueFactory; +import nu.marginalia.mq.MqMessage; +import nu.marginalia.mq.inbox.MqInboxResponse; +import nu.marginalia.mq.inbox.MqSingleShotInbox; +import nu.marginalia.mqapi.index.CreateIndexRequest; +import nu.marginalia.mqapi.index.IndexName; +import nu.marginalia.process.control.ProcessHeartbeat; +import nu.marginalia.ranking.DomainRankings; +import nu.marginalia.service.module.DatabaseModule; +import nu.marginallia.index.journal.IndexJournalFileNames; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.SQLException; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static nu.marginalia.mqapi.ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX; + +public class IndexConstructorMain { + private final FileStorageService fileStorageService; + private final ProcessHeartbeat heartbeat; + private final MessageQueueFactory messageQueueFactory; + private final DomainRankings domainRankings; + private static final Logger logger = LoggerFactory.getLogger(IndexConstructorMain.class); + private final Gson gson = GsonFactory.get(); + public static void main(String[] args) throws Exception { + new org.mariadb.jdbc.Driver(); + + var main = Guice.createInjector( + new IndexConstructorModule(), + new DatabaseModule()) + .getInstance(IndexConstructorMain.class); + + var instructions = main.fetchInstructions(); + + try { + main.run(instructions); + instructions.ok(); + } + catch (Exception ex) { + logger.error("Constructor failed", ex); + instructions.err(); + } + + TimeUnit.SECONDS.sleep(5); + + System.exit(0); + } + + @Inject + public IndexConstructorMain(FileStorageService fileStorageService, + ProcessHeartbeat heartbeat, + MessageQueueFactory messageQueueFactory, + DomainRankings domainRankings) { + + this.fileStorageService = fileStorageService; + this.heartbeat = heartbeat; + this.messageQueueFactory = messageQueueFactory; + this.domainRankings = domainRankings; + } + + private void run(CreateIndexInstructions instructions) throws SQLException, IOException { + heartbeat.start(); + + switch (instructions.name) { + case FORWARD -> createForwardIndex(); + case REVERSE_FULL -> createFullReverseIndex(); + case REVERSE_PRIO -> createPrioReverseIndex(); + } + + heartbeat.shutDown(); + } + + private void createFullReverseIndex() throws SQLException, IOException { + + FileStorage indexLive = fileStorageService.getStorageByType(FileStorageType.INDEX_LIVE); + FileStorage indexStaging = fileStorageService.getStorageByType(FileStorageType.INDEX_STAGING); + + Path inputFile = IndexJournalFileNames.resolve(indexStaging.asPath()); + Path outputFileDocs = ReverseIndexFullFileNames.resolve(indexLive.asPath(), ReverseIndexFullFileNames.FileIdentifier.DOCS, ReverseIndexFullFileNames.FileVersion.NEXT); + Path outputFileWords = ReverseIndexFullFileNames.resolve(indexLive.asPath(), ReverseIndexFullFileNames.FileIdentifier.WORDS, ReverseIndexFullFileNames.FileVersion.NEXT); + + Path tmpDir = indexStaging.asPath().resolve("tmp"); + if (!Files.isDirectory(tmpDir)) Files.createDirectories(tmpDir); + + var journalReader = new IndexJournalReaderSingleCompressedFile(inputFile); + + ReverseIndexFullConverter converter = new ReverseIndexFullConverter( + heartbeat, + tmpDir, + journalReader, + domainRankings, + outputFileWords, + outputFileDocs + ); + + converter.convert(); + } + + + private void createPrioReverseIndex() throws SQLException, IOException { + + FileStorage indexLive = fileStorageService.getStorageByType(FileStorageType.INDEX_LIVE); + FileStorage indexStaging = fileStorageService.getStorageByType(FileStorageType.INDEX_STAGING); + + Path inputFile = IndexJournalFileNames.resolve(indexStaging.asPath()); + Path outputFileDocs = ReverseIndexPrioFileNames.resolve(indexLive.asPath(), ReverseIndexPrioFileNames.FileIdentifier.DOCS, ReverseIndexPrioFileNames.FileVersion.NEXT); + Path outputFileWords = ReverseIndexPrioFileNames.resolve(indexLive.asPath(), ReverseIndexPrioFileNames.FileIdentifier.WORDS, ReverseIndexPrioFileNames.FileVersion.NEXT); + + Path tmpDir = indexStaging.asPath().resolve("tmp"); + if (!Files.isDirectory(tmpDir)) Files.createDirectories(tmpDir); + + var journalReader = new IndexJournalReaderSingleCompressedFile(inputFile); + + ReverseIndexPriorityConverter converter = new ReverseIndexPriorityConverter( + heartbeat, + tmpDir, + journalReader, + domainRankings, + outputFileWords, + outputFileDocs + ); + + converter.convert(); + } + + private void createForwardIndex() throws SQLException, IOException { + + FileStorage indexLive = fileStorageService.getStorageByType(FileStorageType.INDEX_LIVE); + FileStorage indexStaging = fileStorageService.getStorageByType(FileStorageType.INDEX_STAGING); + + Path inputFile = IndexJournalFileNames.resolve(indexStaging.asPath()); + Path outputFileDocsId = ForwardIndexFileNames.resolve(indexLive.asPath(), ForwardIndexFileNames.FileIdentifier.DOC_ID, ForwardIndexFileNames.FileVersion.NEXT); + Path outputFileDocsData = ForwardIndexFileNames.resolve(indexLive.asPath(), ForwardIndexFileNames.FileIdentifier.DOC_DATA, ForwardIndexFileNames.FileVersion.NEXT); + + ForwardIndexConverter converter = new ForwardIndexConverter(heartbeat, + inputFile.toFile(), + outputFileDocsId, + outputFileDocsData, + domainRankings + ); + + converter.convert(); + } + + private class CreateIndexInstructions { + public final IndexName name; + private final MqSingleShotInbox inbox; + private final MqMessage message; + + private CreateIndexInstructions(IndexName name, MqSingleShotInbox inbox, MqMessage message) { + this.name = name; + this.inbox = inbox; + this.message = message; + } + + public void ok() { + inbox.sendResponse(message, MqInboxResponse.ok()); + } + public void err() { + inbox.sendResponse(message, MqInboxResponse.err()); + } + } + + private CreateIndexInstructions fetchInstructions() throws Exception { + + var inbox = messageQueueFactory.createSingleShotInbox(INDEX_CONSTRUCTOR_INBOX, UUID.randomUUID()); + + logger.info("Waiting for instructions"); + var msgOpt = getMessage(inbox, CreateIndexRequest.class.getSimpleName()); + var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received")); + + var payload = gson.fromJson(msg.payload(), CreateIndexRequest.class); + var name = payload.indexName(); + + return new CreateIndexInstructions(name, inbox, msg); + } + + private Optional getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException { + var opt = inbox.waitForMessage(30, TimeUnit.SECONDS); + if (opt.isPresent()) { + if (!opt.get().function().equals(expectedFunction)) { + throw new RuntimeException("Unexpected function: " + opt.get().function()); + } + return opt; + } + else { + var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction)); + stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage)); + return stolenMessage; + } + } +} diff --git a/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorModule.java b/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorModule.java new file mode 100644 index 00000000..c4847e71 --- /dev/null +++ b/code/processes/index-constructor-process/src/main/java/nu/marginalia/index/IndexConstructorModule.java @@ -0,0 +1,14 @@ +package nu.marginalia.index; + +import com.google.inject.AbstractModule; +import nu.marginalia.ProcessConfiguration; + +import java.util.UUID; + +public class IndexConstructorModule extends AbstractModule { + @Override + public void configure() { + bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("index-constructor", 0, UUID.randomUUID())); + + } +} diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/LoaderIndexJournalWriter.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/LoaderIndexJournalWriter.java index f6be34ac..a257f5dd 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/LoaderIndexJournalWriter.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/loader/LoaderIndexJournalWriter.java @@ -15,6 +15,7 @@ import nu.marginalia.lexicon.KeywordLexicon; import nu.marginalia.lexicon.journal.KeywordLexiconJournal; import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode; import nu.marginalia.model.idx.DocumentMetadata; +import nu.marginallia.index.journal.IndexJournalFileNames; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +39,7 @@ public class LoaderIndexJournalWriter { var indexArea = fileStorageService.getStorageByType(FileStorageType.INDEX_STAGING); var lexiconPath = lexiconArea.asPath().resolve("dictionary.dat"); - var indexPath = indexArea.asPath().resolve("page-index.dat"); + var indexPath = IndexJournalFileNames.resolve(indexArea.asPath()); Files.deleteIfExists(indexPath); Files.deleteIfExists(lexiconPath); diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/Actor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/Actor.java index 75c96a22..23db8d8b 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/Actor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/Actor.java @@ -14,9 +14,9 @@ public enum Actor { CRAWL_JOB_EXTRACTOR, EXPORT_DATA, TRUNCATE_LINK_DATABASE, + INDEX_CONSTRUCTOR_MONITOR, CONVERT; - public String id() { return "fsm:" + name().toLowerCase(); } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java index 7be91df3..3575ecb2 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActors.java @@ -44,6 +44,7 @@ public class ControlActors { MessageQueueMonitorActor messageQueueMonitor, ProcessLivenessMonitorActor processMonitorFSM, FileStorageMonitorActor fileStorageMonitorActor, + IndexConstructorMonitorActor indexConstructorMonitorActor, TriggerAdjacencyCalculationActor triggerAdjacencyCalculationActor, CrawlJobExtractorActor crawlJobExtractorActor, ExportDataActor exportDataActor, @@ -58,6 +59,7 @@ public class ControlActors { register(Actor.CONVERT, convertActor); register(Actor.CONVERT_AND_LOAD, convertAndLoadActor); + register(Actor.INDEX_CONSTRUCTOR_MONITOR, indexConstructorMonitorActor); register(Actor.CONVERTER_MONITOR, converterMonitorFSM); register(Actor.LOADER_MONITOR, loaderMonitor); register(Actor.CRAWLER_MONITOR, crawlerMonitorActor); diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/AbstractProcessSpawnerActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/AbstractProcessSpawnerActor.java index b3b3473f..d95c9475 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/AbstractProcessSpawnerActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/AbstractProcessSpawnerActor.java @@ -119,14 +119,14 @@ public class AbstractProcessSpawnerActor extends AbstractActorPrototype { if (attempts < MAX_ATTEMPTS) transition(RUN, attempts + 1); else error(); } - else if (endTime - startTime < TimeUnit.SECONDS.toMillis(10)) { + else if (endTime - startTime < TimeUnit.SECONDS.toMillis(1)) { // To avoid boot loops, we transition to error if the process - // didn't run for longer than 10 seconds. This might happen if + // didn't run for longer than 1 seconds. This might happen if // the process crashes before it can reach the heartbeat and inbox // stages of execution. In this case it would not report having acted // on its message, and the process would be restarted forever without // the attempts counter incrementing. - error("Process terminated within 10 seconds of starting"); + error("Process terminated within 1 seconds of starting"); } } catch (InterruptedException ex) { diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/IndexConstructorMonitorActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/IndexConstructorMonitorActor.java new file mode 100644 index 00000000..abc44d6b --- /dev/null +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/IndexConstructorMonitorActor.java @@ -0,0 +1,22 @@ +package nu.marginalia.control.actor.monitor; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.actor.ActorStateFactory; +import nu.marginalia.control.process.ProcessService; +import nu.marginalia.mq.persistence.MqPersistence; +import nu.marginalia.mqapi.ProcessInboxNames; + +@Singleton +public class IndexConstructorMonitorActor extends AbstractProcessSpawnerActor { + + + @Inject + public IndexConstructorMonitorActor(ActorStateFactory stateFactory, + MqPersistence persistence, + ProcessService processService) { + super(stateFactory, persistence, processService, ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX, ProcessService.ProcessId.INDEX_CONSTRUCTOR); + } + + +} diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertAndLoadActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertAndLoadActor.java index e0ad6979..c0662937 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertAndLoadActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/task/ConvertAndLoadActor.java @@ -13,6 +13,8 @@ import nu.marginalia.index.client.IndexClient; import nu.marginalia.index.client.IndexMqEndpoints; import nu.marginalia.mqapi.converting.ConvertAction; import nu.marginalia.mqapi.converting.ConvertRequest; +import nu.marginalia.mqapi.index.CreateIndexRequest; +import nu.marginalia.mqapi.index.IndexName; import nu.marginalia.mqapi.loading.LoadRequest; import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.model.FileStorageBaseType; @@ -45,14 +47,19 @@ public class ConvertAndLoadActor extends AbstractActorPrototype { public static final String REPARTITION = "REPARTITION"; public static final String REPARTITION_WAIT = "REPARTITION-WAIT"; - public static final String REINDEX = "REINDEX"; - public static final String REINDEX_WAIT = "REINDEX-WAIT"; - public static final String SWITCH_LINKDB = "SWITCH-LINKDB"; + public static final String REINDEX_FWD = "REINDEX_FWD"; + public static final String REINDEX_FWD_WAIT = "REINDEX-FWD-WAIT"; + public static final String REINDEX_FULL = "REINDEX_FULL"; + public static final String REINDEX_FULL_WAIT = "REINDEX-FULL-WAIT"; + public static final String REINDEX_PRIO = "REINDEX_PRIO"; + public static final String REINDEX_PRIO_WAIT = "REINDEX-PRIO-WAIT"; + public static final String SWITCH_OVER = "SWITCH-LINKDB"; public static final String END = "END"; private final ActorProcessWatcher processWatcher; private final MqOutbox mqConverterOutbox; private final MqOutbox mqLoaderOutbox; + private final MqOutbox mqIndexConstructorOutbox; private final MqOutbox indexOutbox; private final MqOutbox searchOutbox; private final FileStorageService storageService; @@ -89,6 +96,7 @@ public class ConvertAndLoadActor extends AbstractActorPrototype { this.searchOutbox = searchClient.outbox(); this.mqConverterOutbox = processOutboxes.getConverterOutbox(); this.mqLoaderOutbox = processOutboxes.getLoaderOutbox(); + this.mqIndexConstructorOutbox = processOutboxes.getIndexConstructorOutbox(); this.storageService = storageService; this.gson = gson; } @@ -228,7 +236,7 @@ public class ConvertAndLoadActor extends AbstractActorPrototype { @ActorState( name = REPARTITION_WAIT, - next = REINDEX, + next = REINDEX_FWD, resume = ActorResumeBehavior.RETRY, description = """ Wait for the index-service to finish repartitioning the index. @@ -243,26 +251,27 @@ public class ConvertAndLoadActor extends AbstractActorPrototype { } @ActorState( - name = REINDEX, - next = REINDEX_WAIT, + name = REINDEX_FWD, + next = REINDEX_FWD_WAIT, description = """ - Instruct the index-service to reindex the data then transition to REINDEX_WAIT. + Reconstruct the fwd index """ ) - public Long reindex() throws Exception { - return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REINDEX, ""); + public Long reindexFwd() throws Exception { + var request = new CreateIndexRequest(IndexName.FORWARD); + return mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request)); } @ActorState( - name = REINDEX_WAIT, - next = SWITCH_LINKDB, + name = REINDEX_FWD_WAIT, + next = REINDEX_FULL, resume = ActorResumeBehavior.RETRY, description = """ - Wait for the index-service to finish reindexing the data. + Wait for the reindex job to finish. """ ) - public void reindexReply(Long id) throws Exception { - var rsp = indexOutbox.waitResponse(id); + public void reindexFwdWait(Long id) throws Exception { + var rsp = mqIndexConstructorOutbox.waitResponse(id); if (rsp.state() != MqMessageState.OK) { error("Repartition failed"); @@ -270,15 +279,74 @@ public class ConvertAndLoadActor extends AbstractActorPrototype { } @ActorState( - name = SWITCH_LINKDB, + name = REINDEX_FULL, + next = REINDEX_FULL_WAIT, + description = """ + Reconstruct the full index + """ + ) + public Long reindexFull() throws Exception { + var request = new CreateIndexRequest(IndexName.REVERSE_FULL); + return mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request)); + } + + @ActorState( + name = REINDEX_FULL_WAIT, + next = REINDEX_PRIO, + resume = ActorResumeBehavior.RETRY, + description = """ + Wait for the reindex job to finish. + """ + ) + public void reindexFullWait(Long id) throws Exception { + var rsp = mqIndexConstructorOutbox.waitResponse(id); + + if (rsp.state() != MqMessageState.OK) { + error("Repartition failed"); + } + } + + @ActorState( + name = REINDEX_PRIO, + next = REINDEX_PRIO_WAIT, + resume = ActorResumeBehavior.RETRY, + description = """ + Reconstruct the prio index + """ + ) + public long reindexPrio() throws Exception { + var request = new CreateIndexRequest(IndexName.REVERSE_PRIO); + return mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request)); + } + @ActorState( + name = REINDEX_PRIO_WAIT, + next = SWITCH_OVER, + resume = ActorResumeBehavior.RETRY, + description = """ + Wait for the reindex job to finish. + """ + ) + public void reindexPrioWait(Long id) throws Exception { + var rsp = mqIndexConstructorOutbox.waitResponse(id); + + if (rsp.state() != MqMessageState.OK) { + error("Repartition failed"); + } + } + + + @ActorState( + name = SWITCH_OVER, next = END, resume = ActorResumeBehavior.RETRY, description = """ - Instruct the search service to switch to the new linkdb + Instruct the search service to switch to the new linkdb, + and the index service to switch over to the new index. """ ) - public void switchLinkdb(Long id) throws Exception { + public void switchOver(Long id) throws Exception { searchOutbox.sendNotice(SearchMqEndpoints.SWITCH_LINKDB, ":-)"); + indexOutbox.sendNotice(IndexMqEndpoints.INDEX_REINDEX, ":^D"); } } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java index accb3351..5d8554bb 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/model/ProcessHeartbeat.java @@ -44,6 +44,7 @@ public record ProcessHeartbeat( case "loader" -> ProcessService.ProcessId.LOADER; case "website-adjacencies-calculator" -> ProcessService.ProcessId.ADJACENCIES_CALCULATOR; case "crawl-job-extractor" -> ProcessService.ProcessId.CRAWL_JOB_EXTRACTOR; + case "index-constructor" -> ProcessService.ProcessId.INDEX_CONSTRUCTOR; default -> null; }; } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/process/ProcessOutboxes.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/process/ProcessOutboxes.java index b5b74406..cb45b6f5 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/process/ProcessOutboxes.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/process/ProcessOutboxes.java @@ -12,6 +12,7 @@ public class ProcessOutboxes { private final MqOutbox converterOutbox; private final MqOutbox loaderOutbox; private final MqOutbox crawlerOutbox; + private final MqOutbox indexConstructorOutbox; @Inject public ProcessOutboxes(BaseServiceParams params, MqPersistence persistence) { @@ -30,6 +31,11 @@ public class ProcessOutboxes { params.configuration.serviceName(), params.configuration.instanceUuid() ); + indexConstructorOutbox = new MqOutbox(persistence, + ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX, + params.configuration.serviceName(), + params.configuration.instanceUuid() + ); } @@ -44,4 +50,6 @@ public class ProcessOutboxes { public MqOutbox getCrawlerOutbox() { return crawlerOutbox; } + + public MqOutbox getIndexConstructorOutbox() { return indexConstructorOutbox; } } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/process/ProcessService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/process/ProcessService.java index 14403dfb..5d71ff5c 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/process/ProcessService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/process/ProcessService.java @@ -32,6 +32,7 @@ public class ProcessService { CRAWLER("crawler-process/bin/crawler-process"), CONVERTER("converter-process/bin/converter-process"), LOADER("loader-process/bin/loader-process"), + INDEX_CONSTRUCTOR("index-construction-process/bin/index-construction-process"), ADJACENCIES_CALCULATOR("website-adjacencies-calculator/bin/website-adjacencies-calculator"), CRAWL_JOB_EXTRACTOR("crawl-job-extractor-process/bin/crawl-job-extractor-process") ; diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java index a0ff5582..f494a5d5 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java @@ -11,6 +11,7 @@ import nu.marginalia.index.svc.IndexSearchSetsService; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.server.*; +import nu.marginalia.service.server.mq.MqNotification; import nu.marginalia.service.server.mq.MqRequest; import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; @@ -93,7 +94,7 @@ public class IndexService extends Service { return "ok"; } - @MqRequest(endpoint = IndexMqEndpoints.INDEX_REINDEX) + @MqNotification(endpoint = IndexMqEndpoints.INDEX_REINDEX) public String reindex(String message) throws Exception { if (!opsService.reindex()) { throw new IllegalStateException("Ops lock busy"); @@ -112,34 +113,8 @@ public class IndexService extends Service { searchIndex.init(); initialized = true; } - - if (!opsService.run(this::autoConvert)) { - logger.warn("Auto-convert could not be performed, ops lock busy"); - } } - private void autoConvert() { - if (!servicesFactory.isConvertedIndexMissing() - || !servicesFactory.isPreconvertedIndexPresent() - || Boolean.getBoolean("no-auto-convert") - ) { - return; - } - - try { - eventLog.logEvent("INDEX-AUTO-CONVERT-BEGIN", ""); - logger.info("Auto-converting"); - searchSetsService.recalculateAll(); - searchIndex.switchIndex(); - eventLog.logEvent("INDEX-AUTO-CONVERT-END", ""); - logger.info("Auto-conversion finished!"); - } - catch (IOException ex) { - logger.error("Auto convert failed", ex); - } - } - - } diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexServicesFactory.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexServicesFactory.java index 9e0c2a04..e4c45e4f 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexServicesFactory.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexServicesFactory.java @@ -5,8 +5,11 @@ import com.google.inject.Singleton; import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.db.storage.model.FileStorageType; import nu.marginalia.index.forward.ForwardIndexConverter; +import nu.marginalia.index.forward.ForwardIndexFileNames; import nu.marginalia.index.forward.ForwardIndexReader; +import nu.marginalia.index.full.ReverseIndexFullFileNames; import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile; +import nu.marginalia.index.priority.ReverseIndexPrioFileNames; import nu.marginalia.index.priority.ReverseIndexPriorityConverter; import nu.marginalia.index.full.ReverseIndexFullConverter; import nu.marginalia.index.priority.ReverseIndexPriorityReader; @@ -15,6 +18,7 @@ import nu.marginalia.index.full.ReverseIndexFullReader; import nu.marginalia.ranking.DomainRankings; import nu.marginalia.index.index.SearchIndexReader; import nu.marginalia.service.control.ServiceHeartbeat; +import org.checkerframework.checker.units.qual.C; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,23 +33,8 @@ import java.util.stream.Stream; @Singleton public class IndexServicesFactory { - private final Path tmpFileDir; - private final ServiceHeartbeat heartbeat; private final Path liveStorage; - private final Path stagingStorage; - private final Logger logger = LoggerFactory.getLogger(getClass()); - - private final Path writerIndexFile; - - private final PartitionedDataFile fwdIndexDocId; - private final PartitionedDataFile fwdIndexDocData; - private final PartitionedDataFile revIndexDoc; - private final PartitionedDataFile revIndexWords; - - private final PartitionedDataFile revPrioIndexDoc; - private final PartitionedDataFile revPrioIndexWords; - private final Path searchSetsBase; final int LIVE_PART = 0; @@ -56,165 +45,58 @@ public class IndexServicesFactory { ServiceHeartbeat heartbeat, FileStorageService fileStorageService ) throws IOException, SQLException { - this.heartbeat = heartbeat; liveStorage = fileStorageService.getStorageByType(FileStorageType.INDEX_LIVE).asPath(); - stagingStorage = fileStorageService.getStorageByType(FileStorageType.INDEX_STAGING).asPath(); - tmpFileDir = fileStorageService.getStorageByType(FileStorageType.INDEX_STAGING).asPath().resolve("tmp"); searchSetsBase = fileStorageService.getStorageByType(FileStorageType.SEARCH_SETS).asPath(); - if (!Files.exists(tmpFileDir)) { - Files.createDirectories(tmpFileDir); - } - - writerIndexFile = stagingStorage.resolve("page-index.dat"); - - fwdIndexDocId = new PartitionedDataFile(liveStorage, "fwd-doc-id.dat"); - fwdIndexDocData = new PartitionedDataFile(liveStorage, "fwd-doc-data.dat"); - - revIndexDoc = new PartitionedDataFile(liveStorage, "rev-doc.dat"); - revIndexWords = new PartitionedDataFile(liveStorage, "rev-words.dat"); - - revPrioIndexDoc = new PartitionedDataFile(liveStorage, "rev-prio-doc.dat"); - revPrioIndexWords = new PartitionedDataFile(liveStorage, "rev-prio-words.dat"); } public Path getSearchSetsBase() { return searchSetsBase; } - public boolean isPreconvertedIndexPresent() { - return Stream.of( - writerIndexFile - ).allMatch(Files::exists); - } - - public boolean isConvertedIndexMissing() { - return Stream.of( - revIndexWords.get(LIVE_PART).toPath(), - revIndexDoc.get(LIVE_PART).toPath(), - revPrioIndexWords.get(LIVE_PART).toPath(), - revPrioIndexDoc.get(LIVE_PART).toPath(), - fwdIndexDocData.get(LIVE_PART).toPath(), - fwdIndexDocId.get(LIVE_PART).toPath() - ).noneMatch(Files::exists); - } - - enum ConvertSteps { - FORWARD_INDEX, - FULL_REVERSE_INDEX, - PRIORITY_REVERSE_INDEX, - FINISHED - } - public void convertIndex(DomainRankings domainRankings) throws IOException { - try (var hb = heartbeat.createServiceTaskHeartbeat(ConvertSteps.class, "index-conversion")) { - hb.progress(ConvertSteps.FORWARD_INDEX); - convertForwardIndex(domainRankings); - - hb.progress(ConvertSteps.FULL_REVERSE_INDEX); - convertFullReverseIndex(domainRankings); - - hb.progress(ConvertSteps.PRIORITY_REVERSE_INDEX); - convertPriorityReverseIndex(domainRankings); - - hb.progress(ConvertSteps.FINISHED); - } - } - - private void convertFullReverseIndex(DomainRankings domainRankings) throws IOException { - logger.info("Converting full reverse index {}", writerIndexFile); - - var journalReader = new IndexJournalReaderSingleCompressedFile(writerIndexFile); - var converter = new ReverseIndexFullConverter( - heartbeat, - tmpFileDir, - journalReader, - domainRankings, - revIndexWords.get(NEXT_PART).toPath(), - revIndexDoc.get(NEXT_PART).toPath()); - - converter.convert(); - - tryGc(); - } - - private void convertPriorityReverseIndex(DomainRankings domainRankings) throws IOException { - - logger.info("Converting priority reverse index {}", writerIndexFile); - - var journalReader = new IndexJournalReaderSingleCompressedFile(writerIndexFile, null, - ReverseIndexPriorityParameters::filterPriorityRecord); - - var converter = new ReverseIndexPriorityConverter(heartbeat, - tmpFileDir, - journalReader, - domainRankings, - revPrioIndexWords.get(NEXT_PART).toPath(), - revPrioIndexDoc.get(NEXT_PART).toPath()); - - converter.convert(); - - tryGc(); - } - - private void convertForwardIndex(DomainRankings domainRankings) throws IOException { - - - logger.info("Converting forward index data {}", writerIndexFile); - - new ForwardIndexConverter(heartbeat, - writerIndexFile.toFile(), - fwdIndexDocId.get(NEXT_PART).toPath(), - fwdIndexDocData.get(NEXT_PART).toPath(), - domainRankings) - .convert(); - - tryGc(); - } - - - public void tryGc() { - - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - System.gc(); - } - public ReverseIndexFullReader getReverseIndexReader() throws IOException { + return new ReverseIndexFullReader( - revIndexWords.get(LIVE_PART).toPath(), - revIndexDoc.get(LIVE_PART).toPath()); - } - public ReverseIndexPriorityReader getReverseIndexPrioReader() throws IOException { - return new ReverseIndexPriorityReader( - revPrioIndexWords.get(LIVE_PART).toPath(), - revPrioIndexDoc.get(LIVE_PART).toPath()); - } - public ForwardIndexReader getForwardIndexReader() throws IOException { - return new ForwardIndexReader( - fwdIndexDocId.get(LIVE_PART).toPath(), - fwdIndexDocData.get(LIVE_PART).toPath() + ReverseIndexFullFileNames.resolve(liveStorage, ReverseIndexFullFileNames.FileIdentifier.WORDS, ReverseIndexFullFileNames.FileVersion.CURRENT), + ReverseIndexFullFileNames.resolve(liveStorage, ReverseIndexFullFileNames.FileIdentifier.DOCS, ReverseIndexFullFileNames.FileVersion.CURRENT) ); } - public Callable switchFilesJob() { - return () -> { + public ReverseIndexPriorityReader getReverseIndexPrioReader() throws IOException { + return new ReverseIndexPriorityReader( + ReverseIndexPrioFileNames.resolve(liveStorage, ReverseIndexPrioFileNames.FileIdentifier.WORDS, ReverseIndexPrioFileNames.FileVersion.CURRENT), + ReverseIndexPrioFileNames.resolve(liveStorage, ReverseIndexPrioFileNames.FileIdentifier.DOCS, ReverseIndexPrioFileNames.FileVersion.CURRENT) + ); + } - switchFile(revIndexDoc.get(NEXT_PART).toPath(), revIndexDoc.get(LIVE_PART).toPath()); - switchFile(revIndexWords.get(NEXT_PART).toPath(), revIndexWords.get(LIVE_PART).toPath()); + public ForwardIndexReader getForwardIndexReader() throws IOException { + return new ForwardIndexReader( + ForwardIndexFileNames.resolve(liveStorage, ForwardIndexFileNames.FileIdentifier.DOC_ID, ForwardIndexFileNames.FileVersion.CURRENT), + ForwardIndexFileNames.resolve(liveStorage, ForwardIndexFileNames.FileIdentifier.DOC_DATA, ForwardIndexFileNames.FileVersion.CURRENT) + ); + } - switchFile(revPrioIndexDoc.get(NEXT_PART).toPath(), revPrioIndexDoc.get(LIVE_PART).toPath()); - switchFile(revPrioIndexWords.get(NEXT_PART).toPath(), revPrioIndexWords.get(LIVE_PART).toPath()); + public void switchFiles() throws IOException { - switchFile(fwdIndexDocId.get(NEXT_PART).toPath(), fwdIndexDocId.get(LIVE_PART).toPath()); - switchFile(fwdIndexDocData.get(NEXT_PART).toPath(), fwdIndexDocData.get(LIVE_PART).toPath()); - - return true; - }; + for (var file : ReverseIndexFullFileNames.FileIdentifier.values()) { + switchFile( + ReverseIndexFullFileNames.resolve(liveStorage, file, ReverseIndexFullFileNames.FileVersion.NEXT), + ReverseIndexFullFileNames.resolve(liveStorage, file, ReverseIndexFullFileNames.FileVersion.CURRENT) + ); + } + for (var file : ReverseIndexPrioFileNames.FileIdentifier.values()) { + switchFile( + ReverseIndexPrioFileNames.resolve(liveStorage, file, ReverseIndexPrioFileNames.FileVersion.NEXT), + ReverseIndexPrioFileNames.resolve(liveStorage, file, ReverseIndexPrioFileNames.FileVersion.CURRENT) + ); + } + for (var file : ForwardIndexFileNames.FileIdentifier.values()) { + switchFile( + ForwardIndexFileNames.resolve(liveStorage, file, ForwardIndexFileNames.FileVersion.NEXT), + ForwardIndexFileNames.resolve(liveStorage, file, ForwardIndexFileNames.FileVersion.CURRENT) + ); + } } public void switchFile(Path from, Path to) throws IOException { @@ -231,37 +113,3 @@ public class IndexServicesFactory { ); } } - -class RootDataFile { - private final Path partition; - private final String pattern; - - RootDataFile(Path partition, String pattern) { - this.partition = partition; - this.pattern = pattern; - } - - public File get() { - return partition.resolve(pattern).toFile(); - } -} - - -class PartitionedDataFile { - private final Path partition; - private final String pattern; - - PartitionedDataFile(Path partition, String pattern) { - this.partition = partition; - this.pattern = pattern; - } - - public File get(Object id) { - Path partitionDir = partition.resolve(id.toString()); - if (!partitionDir.toFile().exists()) { - partitionDir.toFile().mkdir(); - } - return partitionDir.resolve(pattern).toFile(); - } - -} \ No newline at end of file diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/index/SearchIndex.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/index/SearchIndex.java index 0da02a42..af9a5d05 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/index/SearchIndex.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/index/SearchIndex.java @@ -62,8 +62,6 @@ public class SearchIndex { else { eventLog.logEvent("INDEX-INIT", "No index loaded"); } - - } catch (Exception ex) { logger.error("Uncaught exception", ex); @@ -74,19 +72,12 @@ public class SearchIndex { } public boolean switchIndex() throws IOException { - - eventLog.logEvent("CONVERT-INDEX-BEGIN", ""); - servicesFactory.convertIndex(searchSetsService.getDomainRankings()); - eventLog.logEvent("CONVERT-INDEX-END", ""); - System.gc(); - eventLog.logEvent("INDEX-SWITCH-BEGIN", ""); Lock lock = indexReplacementLock.writeLock(); try { lock.lock(); - servicesFactory.switchFilesJob().call(); - + servicesFactory.switchFiles(); indexReader = servicesFactory.getSearchIndexReader(); eventLog.logEvent("INDEX-SWITCH-OK", ""); diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 15de9024..db9a6b82 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,5 +1,5 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.2-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/settings.gradle b/settings.gradle index 623e6576..1b579d30 100644 --- a/settings.gradle +++ b/settings.gradle @@ -61,6 +61,7 @@ include 'code:common:process' include 'code:processes:converting-process' include 'code:processes:crawling-process' include 'code:processes:loading-process' +include 'code:processes:index-constructor-process' include 'code:processes:test-data' include 'code:process-models:converting-model'