(index) Move index construction to separate process.

This provides a much cleaner separation of concerns, and makes it possible to get rid of a lot of the gunkier parts of the index service.  It will also permit lowering the Xmx on the index service a fair bit, so we can get CompressedOOps again :D
This commit is contained in:
Viktor Lofgren 2023-08-25 12:52:54 +02:00
parent e741301417
commit 460998d512
36 changed files with 809 additions and 302 deletions

View File

@ -34,6 +34,10 @@ tasks.register('dist', Copy) {
from tarTree("$buildDir/dist/crawl-job-extractor-process.tar")
into "$projectDir/run/dist/"
}
copy {
from tarTree("$buildDir/dist/index-construction-process.tar")
into "$projectDir/run/dist/"
}
}
}
idea {

View File

@ -4,4 +4,6 @@ public class ProcessInboxNames {
public static final String CONVERTER_INBOX = "converter";
public static final String LOADER_INBOX = "loader";
public static final String CRAWLER_INBOX = "crawler";
public static final String INDEX_CONSTRUCTOR_INBOX = "index_constructor";
}

View File

@ -0,0 +1,5 @@
package nu.marginalia.mqapi.index;
public record CreateIndexRequest(IndexName indexName)
{
}

View File

@ -0,0 +1,7 @@
package nu.marginalia.mqapi.index;
public enum IndexName {
FORWARD,
REVERSE_FULL,
REVERSE_PRIO
}

View File

@ -19,6 +19,8 @@ public class ProcessHeartbeat {
private final String processName;
private final String processBase;
private final String instanceUUID;
@org.jetbrains.annotations.NotNull
private final ProcessConfiguration configuration;
private final HikariDataSource dataSource;
@ -35,6 +37,7 @@ public class ProcessHeartbeat {
{
this.processName = configuration.processName() + ":" + configuration.node();
this.processBase = configuration.processName();
this.configuration = configuration;
this.dataSource = dataSource;
this.instanceUUID = configuration.instanceUuid().toString();
@ -44,6 +47,12 @@ public class ProcessHeartbeat {
Runtime.getRuntime().addShutdownHook(new Thread(this::shutDown));
}
public <T extends Enum<T>> ProcessTaskHeartbeat<T> createProcessTaskHeartbeat(Class<T> steps, String processName) {
return new ProcessTaskHeartbeat<>(steps, configuration, processName, dataSource);
}
public void setProgress(double progress) {
this.progress = (int) (progress * 100);
}

View File

@ -0,0 +1,190 @@
package nu.marginalia.process.control;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/** This object sends a heartbeat to the database every few seconds,
* updating with the progress of a task within a service. Progress is tracked by providing
* enumerations corresponding to the steps in the task. It's important they're arranged in the same
* order as the steps in the task in order to get an accurate progress tracking.
*/
public class ProcessTaskHeartbeat<T extends Enum<T>> implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(ProcessTaskHeartbeat.class);
private final String taskName;
private final String taskBase;
private final String instanceUUID;
private final HikariDataSource dataSource;
private final Thread runnerThread;
private final int heartbeatInterval = Integer.getInteger("mcp.heartbeat.interval", 1);
private final String serviceInstanceUUID;
private final int stepCount;
private volatile boolean running = false;
private volatile int stepNum = 0;
private volatile String step = "-";
ProcessTaskHeartbeat(Class<T> stepClass,
ProcessConfiguration configuration,
String taskName,
HikariDataSource dataSource)
{
this.taskName = configuration.processName() + "." + taskName + ":" + configuration.node();
this.taskBase = configuration.processName() + "." + taskName;
this.dataSource = dataSource;
this.instanceUUID = UUID.randomUUID().toString();
this.serviceInstanceUUID = configuration.instanceUuid().toString();
this.stepCount = stepClass.getEnumConstants().length;
heartbeatInit();
runnerThread = new Thread(this::run);
runnerThread.start();
}
/** Update the progress of the task. This is a fast function that doesn't block;
* the actual update is done in a separate thread.
*
* @param step The current step in the task.
*/
public void progress(T step) {
this.step = step.name();
// off by one since we calculate the progress based on the number of steps,
// and Enum.ordinal() is zero-based (so the 5th step in a 5 step task is 4, not 5; resulting in the
// final progress being 80% and not 100%)
this.stepNum = 1 + step.ordinal();
logger.info("ProcessTask {} progress: {}", taskBase, step.name());
}
public void shutDown() {
if (!running)
return;
running = false;
try {
runnerThread.join();
heartbeatStop();
}
catch (InterruptedException|SQLException ex) {
logger.warn("ProcessHeartbeat shutdown failed", ex);
}
}
private void run() {
if (!running)
running = true;
else
return;
try {
while (running) {
try {
heartbeatUpdate();
}
catch (SQLException ex) {
logger.warn("ProcessHeartbeat failed to update", ex);
}
TimeUnit.SECONDS.sleep(heartbeatInterval);
}
}
catch (InterruptedException ex) {
logger.error("ProcessHeartbeat caught irrecoverable exception, killing service", ex);
System.exit(255);
}
}
private void heartbeatInit() {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
INSERT INTO TASK_HEARTBEAT (TASK_NAME, TASK_BASE, INSTANCE, SERVICE_INSTANCE, HEARTBEAT_TIME, STATUS)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP(6), 'STARTING')
ON DUPLICATE KEY UPDATE
INSTANCE = ?,
SERVICE_INSTANCE = ?,
HEARTBEAT_TIME = CURRENT_TIMESTAMP(6),
STATUS = 'STARTING'
"""
))
{
stmt.setString(1, taskName);
stmt.setString(2, taskBase);
stmt.setString(3, instanceUUID);
stmt.setString(4, serviceInstanceUUID);
stmt.setString(5, instanceUUID);
stmt.setString(6, serviceInstanceUUID);
stmt.executeUpdate();
}
}
catch (SQLException ex) {
logger.error("ProcessHeartbeat failed to initialize", ex);
throw new RuntimeException(ex);
}
}
private void heartbeatUpdate() throws SQLException {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
UPDATE TASK_HEARTBEAT
SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6),
STATUS = 'RUNNING',
PROGRESS = ?,
STAGE_NAME = ?
WHERE INSTANCE = ?
""")
)
{
stmt.setInt(1, (int) Math.round(100 * stepNum / (double) stepCount));
stmt.setString(2, step);
stmt.setString(3, instanceUUID);
stmt.executeUpdate();
}
}
}
private void heartbeatStop() throws SQLException {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
UPDATE TASK_HEARTBEAT
SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6),
STATUS='STOPPED',
PROGRESS = ?,
STAGE_NAME = ?
WHERE INSTANCE = ?
""")
)
{
stmt.setInt(1, (int) Math.round(100 * stepNum / (double) stepCount));
stmt.setString( 2, step);
stmt.setString( 3, instanceUUID);
stmt.executeUpdate();
}
}
}
@Override
public void close() {
shutDown();
}
}

View File

@ -18,7 +18,7 @@ dependencies {
implementation project(':code:features-index:index-journal')
implementation project(':code:features-index:lexicon')
implementation project(':code:common:model')
implementation project(':code:common:service')
implementation project(':code:common:process')
implementation project(':third-party:uppend')

View File

@ -6,10 +6,8 @@ import nu.marginalia.index.journal.reader.IndexJournalReader;
import nu.marginalia.array.LongArray;
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.service.control.ServiceHeartbeat;
import org.roaringbitmap.IntConsumer;
import org.roaringbitmap.RoaringBitmap;
import org.roaringbitmap.longlong.LongConsumer;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger;
@ -22,7 +20,7 @@ import java.nio.file.Path;
public class ForwardIndexConverter {
private final ServiceHeartbeat heartbeat;
private final ProcessHeartbeat heartbeat;
private final File inputFile;
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -32,7 +30,7 @@ public class ForwardIndexConverter {
private final DomainRankings domainRankings;
public ForwardIndexConverter(ServiceHeartbeat heartbeat,
public ForwardIndexConverter(ProcessHeartbeat heartbeat,
File inputFile,
Path outputFileDocsId,
Path outputFileDocsData,
@ -66,7 +64,7 @@ public class ForwardIndexConverter {
logger.info("Domain Rankings size = {}", domainRankings.size());
try (var progress = heartbeat.createServiceTaskHeartbeat(TaskSteps.class, "forwardIndexConverter")) {
try (var progress = heartbeat.createProcessTaskHeartbeat(TaskSteps.class, "forwardIndexConverter")) {
progress.progress(TaskSteps.GET_DOC_IDS);
LongArray docsFileId = getDocIds(outputFileDocsId, journalReader);

View File

@ -0,0 +1,28 @@
package nu.marginalia.index.forward;
import java.nio.file.Path;
public class ForwardIndexFileNames {
public static Path resolve(Path basePath, FileIdentifier identifier, FileVersion version) {
return switch (identifier) {
case DOC_ID -> switch (version) {
case NEXT -> basePath.resolve("fwd-doc-id.dat.next");
case CURRENT -> basePath.resolve("fwd-doc-id.dat");
};
case DOC_DATA -> switch (version) {
case NEXT -> basePath.resolve("fwd-doc-data.dat.next");
case CURRENT -> basePath.resolve("fwd-doc-data.dat");
};
};
}
public enum FileVersion {
CURRENT,
NEXT
};
public enum FileIdentifier {
DOC_DATA,
DOC_ID
}
}

View File

@ -6,11 +6,11 @@ import nu.marginalia.index.journal.writer.IndexJournalWriterImpl;
import nu.marginalia.index.journal.writer.IndexJournalWriter;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.control.ProcessTaskHeartbeat;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.service.control.ServiceTaskHeartbeat;
import nu.marginalia.test.TestUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -104,9 +104,9 @@ class ForwardIndexConverterTest {
void testForwardIndex() throws IOException {
// RIP fairies
var serviceHeartbeat = Mockito.mock(ServiceHeartbeat.class);
when(serviceHeartbeat.createServiceTaskHeartbeat(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(ServiceTaskHeartbeat.class));
var serviceHeartbeat = Mockito.mock(ProcessHeartbeat.class);
when(serviceHeartbeat.createProcessTaskHeartbeat(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(ProcessTaskHeartbeat.class));
new ForwardIndexConverter(serviceHeartbeat, indexFile.toFile(), docsFileId, docsFileData, new DomainRankings()).convert();

View File

@ -0,0 +1,9 @@
package nu.marginallia.index.journal;
import java.nio.file.Path;
public class IndexJournalFileNames {
public static Path resolve(Path base) {
return base.resolve("page-index.dat");
}
}

View File

@ -20,7 +20,7 @@ dependencies {
implementation project(':code:features-index:index-journal')
implementation project(':code:features-index:lexicon')
implementation project(':code:common:model')
implementation project(':code:common:service')
implementation project(':code:common:process')
implementation libs.lombok
annotationProcessor libs.lombok

View File

@ -8,6 +8,7 @@ import nu.marginalia.index.journal.model.IndexJournalEntryData;
import nu.marginalia.index.journal.model.IndexJournalStatistics;
import nu.marginalia.index.journal.reader.IndexJournalReader;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.rwf.RandomWriteFunnel;
import nu.marginalia.array.IntArray;
@ -22,14 +23,12 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import nu.marginalia.service.control.ServiceHeartbeat;
import static nu.marginalia.index.full.ReverseIndexFullParameters.bTreeContext;
public class ReverseIndexFullConverter {
private static final int RWF_BIN_SIZE = 10_000_000;
private final ServiceHeartbeat heartbeat;
private final ProcessHeartbeat heartbeat;
private final Path tmpFileDir;
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -40,7 +39,7 @@ public class ReverseIndexFullConverter {
private final Path outputFileDocs;
private final SortingContext sortingContext;
public ReverseIndexFullConverter(ServiceHeartbeat heartbeat,
public ReverseIndexFullConverter(ProcessHeartbeat heartbeat,
Path tmpFileDir,
IndexJournalReader journalReader,
DomainRankings domainRankings,
@ -77,7 +76,7 @@ public class ReverseIndexFullConverter {
final Path intermediateUrlsFile = Files.createTempFile(tmpFileDir, "urls-sorted", ".dat");
try (var progress = heartbeat.createServiceTaskHeartbeat(TaskSteps.class, "reverseIndexFullConverter")) {
try (var progress = heartbeat.createProcessTaskHeartbeat(TaskSteps.class, "reverseIndexFullConverter")) {
progress.progress(TaskSteps.ACCUMULATE_STATISTICS);
final IndexJournalStatistics statistics = journalReader.getStatistics();

View File

@ -0,0 +1,28 @@
package nu.marginalia.index.full;
import java.nio.file.Path;
public class ReverseIndexFullFileNames {
public static Path resolve(Path basePath, FileIdentifier identifier, FileVersion version) {
return switch (identifier) {
case WORDS -> switch (version) {
case NEXT -> basePath.resolve("rev-words.dat.next");
case CURRENT -> basePath.resolve("rev-words.dat");
};
case DOCS -> switch (version) {
case NEXT -> basePath.resolve("rev-docs.dat.next");
case CURRENT -> basePath.resolve("rev-docs.dat");
};
};
}
public enum FileVersion {
CURRENT,
NEXT
};
public enum FileIdentifier {
WORDS,
DOCS
}
}

View File

@ -0,0 +1,28 @@
package nu.marginalia.index.priority;
import java.nio.file.Path;
public class ReverseIndexPrioFileNames {
public static Path resolve(Path basePath, FileIdentifier identifier, FileVersion version) {
return switch (identifier) {
case WORDS -> switch (version) {
case NEXT -> basePath.resolve("rev-prio-words.dat.next");
case CURRENT -> basePath.resolve("rev-prio-words.dat");
};
case DOCS -> switch (version) {
case NEXT -> basePath.resolve("rev-prio-docs.dat.next");
case CURRENT -> basePath.resolve("rev-prio-docs.dat");
};
};
}
public enum FileVersion {
CURRENT,
NEXT
};
public enum FileIdentifier {
WORDS,
DOCS
}
}

View File

@ -11,9 +11,9 @@ import nu.marginalia.index.journal.model.IndexJournalEntryData;
import nu.marginalia.index.journal.model.IndexJournalStatistics;
import nu.marginalia.index.journal.reader.IndexJournalReader;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.rwf.RandomWriteFunnel;
import nu.marginalia.service.control.ServiceHeartbeat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -28,7 +28,7 @@ import static nu.marginalia.index.priority.ReverseIndexPriorityParameters.bTreeC
public class ReverseIndexPriorityConverter {
private static final int RWF_BIN_SIZE = 10_000_000;
private final ServiceHeartbeat heartbeat;
private final ProcessHeartbeat heartbeat;
private final Path tmpFileDir;
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -39,7 +39,7 @@ public class ReverseIndexPriorityConverter {
private final Path outputFileDocs;
private final SortingContext sortingContext;
public ReverseIndexPriorityConverter(ServiceHeartbeat heartbeat,
public ReverseIndexPriorityConverter(ProcessHeartbeat heartbeat,
Path tmpFileDir,
IndexJournalReader journalReader,
DomainRankings domainRankings,
@ -76,7 +76,7 @@ public class ReverseIndexPriorityConverter {
final Path intermediateUrlsFile = Files.createTempFile(tmpFileDir, "urls-sorted", ".dat");
try (var progress = heartbeat.createServiceTaskHeartbeat(TaskSteps.class, "reverseIndexPriorityConverter")) {
try (var progress = heartbeat.createProcessTaskHeartbeat(TaskSteps.class, "reverseIndexPriorityConverter")) {
progress.progress(TaskSteps.ACCUMULATE_STATISTICS);
final IndexJournalStatistics statistics = journalReader.getStatistics();

View File

@ -10,12 +10,12 @@ import nu.marginalia.index.journal.writer.IndexJournalWriterImpl;
import nu.marginalia.index.journal.writer.IndexJournalWriter;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.control.ProcessTaskHeartbeat;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.service.control.ServiceTaskHeartbeat;
import nu.marginalia.test.TestUtil;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -90,12 +90,12 @@ class ReverseIndexFullConverterTest {
var journalReader = new IndexJournalReaderSingleCompressedFile(indexFile);
// RIP fairies
var serviceHeartbeat = Mockito.mock(ServiceHeartbeat.class);
when(serviceHeartbeat.createServiceTaskHeartbeat(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(ServiceTaskHeartbeat.class));
var processHeartbeat = Mockito.mock(ProcessHeartbeat.class);
when(processHeartbeat.createProcessTaskHeartbeat(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(ProcessTaskHeartbeat.class));
new ReverseIndexFullConverter(
serviceHeartbeat,
processHeartbeat,
tmpDir, journalReader, new DomainRankings(), wordsFile, docsFile)
.convert();

View File

@ -12,11 +12,11 @@ import nu.marginalia.index.journal.writer.IndexJournalWriter;
import nu.marginalia.index.priority.ReverseIndexPriorityParameters;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.control.ProcessTaskHeartbeat;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.service.control.ServiceTaskHeartbeat;
import nu.marginalia.test.TestUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -123,12 +123,11 @@ class ReverseIndexFullConverterTest2 {
Path tmpDir = Path.of("/tmp");
// RIP fairies
var serviceHeartbeat = Mockito.mock(ServiceHeartbeat.class);
when(serviceHeartbeat.createServiceTaskHeartbeat(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(ServiceTaskHeartbeat.class));
var processHeartbeat = Mockito.mock(ProcessHeartbeat.class);
when(processHeartbeat.createProcessTaskHeartbeat(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(ProcessTaskHeartbeat.class));
new ReverseIndexFullConverter(serviceHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile), new DomainRankings(), wordsFile, docsFile).convert();
new ReverseIndexFullConverter(processHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile), new DomainRankings(), wordsFile, docsFile).convert();
var reverseReader = new ReverseIndexFullReader(wordsFile, docsFile);
@ -153,12 +152,11 @@ class ReverseIndexFullConverterTest2 {
Path tmpDir = Path.of("/tmp");
// RIP fairies
var serviceHeartbeat = Mockito.mock(ServiceHeartbeat.class);
when(serviceHeartbeat.createServiceTaskHeartbeat(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(ServiceTaskHeartbeat.class));
var processHeartbeat = Mockito.mock(ProcessHeartbeat.class);
when(processHeartbeat.createProcessTaskHeartbeat(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(ProcessTaskHeartbeat.class));
new ReverseIndexFullConverter(serviceHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile, null, ReverseIndexPriorityParameters::filterPriorityRecord), new DomainRankings(), wordsFile, docsFile).convert();
new ReverseIndexFullConverter(processHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile, null, ReverseIndexPriorityParameters::filterPriorityRecord), new DomainRankings(), wordsFile, docsFile).convert();
var reverseReader = new ReverseIndexFullReader(wordsFile, docsFile);

View File

@ -14,9 +14,9 @@ import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.control.ProcessTaskHeartbeat;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.service.control.ServiceTaskHeartbeat;
import nu.marginalia.test.TestUtil;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@ -123,12 +123,11 @@ class ReverseIndexPriorityConverterTest2 {
Path tmpDir = Path.of("/tmp");
// RIP fairies
var serviceHeartbeat = Mockito.mock(ServiceHeartbeat.class);
when(serviceHeartbeat.createServiceTaskHeartbeat(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(ServiceTaskHeartbeat.class));
var processHeartbeat = Mockito.mock(ProcessHeartbeat.class);
when(processHeartbeat.createProcessTaskHeartbeat(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(ProcessTaskHeartbeat.class));
new ReverseIndexPriorityConverter(serviceHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile), new DomainRankings(), wordsFile, docsFile).convert();
new ReverseIndexPriorityConverter(processHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile), new DomainRankings(), wordsFile, docsFile).convert();
var reverseReader = new ReverseIndexPriorityReader(wordsFile, docsFile);
@ -153,12 +152,12 @@ class ReverseIndexPriorityConverterTest2 {
Path tmpDir = Path.of("/tmp");
// RIP fairies
var serviceHeartbeat = Mockito.mock(ServiceHeartbeat.class);
when(serviceHeartbeat.createServiceTaskHeartbeat(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(ServiceTaskHeartbeat.class));
new ReverseIndexPriorityConverter(serviceHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile, null, ReverseIndexPriorityParameters::filterPriorityRecord), new DomainRankings(), wordsFile, docsFile).convert();
var processHeartbeat = Mockito.mock(ProcessHeartbeat.class);
when(processHeartbeat.createProcessTaskHeartbeat(Mockito.any(), Mockito.any()))
.thenReturn(Mockito.mock(ProcessTaskHeartbeat.class));
new ReverseIndexPriorityConverter(processHeartbeat, tmpDir, new IndexJournalReaderSingleCompressedFile(indexFile, null, ReverseIndexPriorityParameters::filterPriorityRecord), new DomainRankings(), wordsFile, docsFile).convert();
var reverseReader = new ReverseIndexPriorityReader(wordsFile, docsFile);

View File

@ -0,0 +1,57 @@
plugins {
id 'java'
id "io.freefair.lombok" version "8.2.2"
id 'application'
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(20))
}
}
application {
mainClass = 'nu.marginalia.index.IndexConstructorMain'
applicationName = 'index-construction-process'
}
tasks.distZip.enabled = false
dependencies {
implementation project(':code:api:process-mqapi')
implementation project(':code:common:process')
implementation project(':code:common:service')
implementation project(':code:common:db')
implementation project(':code:common:model')
implementation project(':code:libraries:message-queue')
implementation project(':code:features-index:index-forward')
implementation project(':code:features-index:index-reverse')
implementation project(':code:features-index:index-journal')
implementation project(':code:features-index:domain-ranking')
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.bundles.slf4j
implementation libs.guice
implementation libs.bundles.mariadb
implementation libs.bundles.gson
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
testImplementation project(':code:processes:test-data')
}
test {
useJUnitPlatform()
}
task fastTests(type: Test) {
useJUnitPlatform {
excludeTags "slow"
}
}

View File

@ -0,0 +1,214 @@
package nu.marginalia.index;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorage;
import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.index.forward.ForwardIndexConverter;
import nu.marginalia.index.forward.ForwardIndexFileNames;
import nu.marginalia.index.full.ReverseIndexFullConverter;
import nu.marginalia.index.full.ReverseIndexFullFileNames;
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile;
import nu.marginalia.index.priority.ReverseIndexPrioFileNames;
import nu.marginalia.index.priority.ReverseIndexPriorityConverter;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.mqapi.index.CreateIndexRequest;
import nu.marginalia.mqapi.index.IndexName;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginallia.index.journal.IndexJournalFileNames;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static nu.marginalia.mqapi.ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX;
public class IndexConstructorMain {
private final FileStorageService fileStorageService;
private final ProcessHeartbeat heartbeat;
private final MessageQueueFactory messageQueueFactory;
private final DomainRankings domainRankings;
private static final Logger logger = LoggerFactory.getLogger(IndexConstructorMain.class);
private final Gson gson = GsonFactory.get();
public static void main(String[] args) throws Exception {
new org.mariadb.jdbc.Driver();
var main = Guice.createInjector(
new IndexConstructorModule(),
new DatabaseModule())
.getInstance(IndexConstructorMain.class);
var instructions = main.fetchInstructions();
try {
main.run(instructions);
instructions.ok();
}
catch (Exception ex) {
logger.error("Constructor failed", ex);
instructions.err();
}
TimeUnit.SECONDS.sleep(5);
System.exit(0);
}
@Inject
public IndexConstructorMain(FileStorageService fileStorageService,
ProcessHeartbeat heartbeat,
MessageQueueFactory messageQueueFactory,
DomainRankings domainRankings) {
this.fileStorageService = fileStorageService;
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.domainRankings = domainRankings;
}
private void run(CreateIndexInstructions instructions) throws SQLException, IOException {
heartbeat.start();
switch (instructions.name) {
case FORWARD -> createForwardIndex();
case REVERSE_FULL -> createFullReverseIndex();
case REVERSE_PRIO -> createPrioReverseIndex();
}
heartbeat.shutDown();
}
private void createFullReverseIndex() throws SQLException, IOException {
FileStorage indexLive = fileStorageService.getStorageByType(FileStorageType.INDEX_LIVE);
FileStorage indexStaging = fileStorageService.getStorageByType(FileStorageType.INDEX_STAGING);
Path inputFile = IndexJournalFileNames.resolve(indexStaging.asPath());
Path outputFileDocs = ReverseIndexFullFileNames.resolve(indexLive.asPath(), ReverseIndexFullFileNames.FileIdentifier.DOCS, ReverseIndexFullFileNames.FileVersion.NEXT);
Path outputFileWords = ReverseIndexFullFileNames.resolve(indexLive.asPath(), ReverseIndexFullFileNames.FileIdentifier.WORDS, ReverseIndexFullFileNames.FileVersion.NEXT);
Path tmpDir = indexStaging.asPath().resolve("tmp");
if (!Files.isDirectory(tmpDir)) Files.createDirectories(tmpDir);
var journalReader = new IndexJournalReaderSingleCompressedFile(inputFile);
ReverseIndexFullConverter converter = new ReverseIndexFullConverter(
heartbeat,
tmpDir,
journalReader,
domainRankings,
outputFileWords,
outputFileDocs
);
converter.convert();
}
private void createPrioReverseIndex() throws SQLException, IOException {
FileStorage indexLive = fileStorageService.getStorageByType(FileStorageType.INDEX_LIVE);
FileStorage indexStaging = fileStorageService.getStorageByType(FileStorageType.INDEX_STAGING);
Path inputFile = IndexJournalFileNames.resolve(indexStaging.asPath());
Path outputFileDocs = ReverseIndexPrioFileNames.resolve(indexLive.asPath(), ReverseIndexPrioFileNames.FileIdentifier.DOCS, ReverseIndexPrioFileNames.FileVersion.NEXT);
Path outputFileWords = ReverseIndexPrioFileNames.resolve(indexLive.asPath(), ReverseIndexPrioFileNames.FileIdentifier.WORDS, ReverseIndexPrioFileNames.FileVersion.NEXT);
Path tmpDir = indexStaging.asPath().resolve("tmp");
if (!Files.isDirectory(tmpDir)) Files.createDirectories(tmpDir);
var journalReader = new IndexJournalReaderSingleCompressedFile(inputFile);
ReverseIndexPriorityConverter converter = new ReverseIndexPriorityConverter(
heartbeat,
tmpDir,
journalReader,
domainRankings,
outputFileWords,
outputFileDocs
);
converter.convert();
}
private void createForwardIndex() throws SQLException, IOException {
FileStorage indexLive = fileStorageService.getStorageByType(FileStorageType.INDEX_LIVE);
FileStorage indexStaging = fileStorageService.getStorageByType(FileStorageType.INDEX_STAGING);
Path inputFile = IndexJournalFileNames.resolve(indexStaging.asPath());
Path outputFileDocsId = ForwardIndexFileNames.resolve(indexLive.asPath(), ForwardIndexFileNames.FileIdentifier.DOC_ID, ForwardIndexFileNames.FileVersion.NEXT);
Path outputFileDocsData = ForwardIndexFileNames.resolve(indexLive.asPath(), ForwardIndexFileNames.FileIdentifier.DOC_DATA, ForwardIndexFileNames.FileVersion.NEXT);
ForwardIndexConverter converter = new ForwardIndexConverter(heartbeat,
inputFile.toFile(),
outputFileDocsId,
outputFileDocsData,
domainRankings
);
converter.convert();
}
private class CreateIndexInstructions {
public final IndexName name;
private final MqSingleShotInbox inbox;
private final MqMessage message;
private CreateIndexInstructions(IndexName name, MqSingleShotInbox inbox, MqMessage message) {
this.name = name;
this.inbox = inbox;
this.message = message;
}
public void ok() {
inbox.sendResponse(message, MqInboxResponse.ok());
}
public void err() {
inbox.sendResponse(message, MqInboxResponse.err());
}
}
private CreateIndexInstructions fetchInstructions() throws Exception {
var inbox = messageQueueFactory.createSingleShotInbox(INDEX_CONSTRUCTOR_INBOX, UUID.randomUUID());
logger.info("Waiting for instructions");
var msgOpt = getMessage(inbox, CreateIndexRequest.class.getSimpleName());
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
var payload = gson.fromJson(msg.payload(), CreateIndexRequest.class);
var name = payload.indexName();
return new CreateIndexInstructions(name, inbox, msg);
}
private Optional<MqMessage> getMessage(MqSingleShotInbox inbox, String expectedFunction) throws SQLException, InterruptedException {
var opt = inbox.waitForMessage(30, TimeUnit.SECONDS);
if (opt.isPresent()) {
if (!opt.get().function().equals(expectedFunction)) {
throw new RuntimeException("Unexpected function: " + opt.get().function());
}
return opt;
}
else {
var stolenMessage = inbox.stealMessage(msg -> msg.function().equals(expectedFunction));
stolenMessage.ifPresent(mqMessage -> logger.info("Stole message {}", mqMessage));
return stolenMessage;
}
}
}

View File

@ -0,0 +1,14 @@
package nu.marginalia.index;
import com.google.inject.AbstractModule;
import nu.marginalia.ProcessConfiguration;
import java.util.UUID;
public class IndexConstructorModule extends AbstractModule {
@Override
public void configure() {
bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("index-constructor", 0, UUID.randomUUID()));
}
}

View File

@ -15,6 +15,7 @@ import nu.marginalia.lexicon.KeywordLexicon;
import nu.marginalia.lexicon.journal.KeywordLexiconJournal;
import nu.marginalia.lexicon.journal.KeywordLexiconJournalMode;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginallia.index.journal.IndexJournalFileNames;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -38,7 +39,7 @@ public class LoaderIndexJournalWriter {
var indexArea = fileStorageService.getStorageByType(FileStorageType.INDEX_STAGING);
var lexiconPath = lexiconArea.asPath().resolve("dictionary.dat");
var indexPath = indexArea.asPath().resolve("page-index.dat");
var indexPath = IndexJournalFileNames.resolve(indexArea.asPath());
Files.deleteIfExists(indexPath);
Files.deleteIfExists(lexiconPath);

View File

@ -14,9 +14,9 @@ public enum Actor {
CRAWL_JOB_EXTRACTOR,
EXPORT_DATA,
TRUNCATE_LINK_DATABASE,
INDEX_CONSTRUCTOR_MONITOR,
CONVERT;
public String id() {
return "fsm:" + name().toLowerCase();
}

View File

@ -44,6 +44,7 @@ public class ControlActors {
MessageQueueMonitorActor messageQueueMonitor,
ProcessLivenessMonitorActor processMonitorFSM,
FileStorageMonitorActor fileStorageMonitorActor,
IndexConstructorMonitorActor indexConstructorMonitorActor,
TriggerAdjacencyCalculationActor triggerAdjacencyCalculationActor,
CrawlJobExtractorActor crawlJobExtractorActor,
ExportDataActor exportDataActor,
@ -58,6 +59,7 @@ public class ControlActors {
register(Actor.CONVERT, convertActor);
register(Actor.CONVERT_AND_LOAD, convertAndLoadActor);
register(Actor.INDEX_CONSTRUCTOR_MONITOR, indexConstructorMonitorActor);
register(Actor.CONVERTER_MONITOR, converterMonitorFSM);
register(Actor.LOADER_MONITOR, loaderMonitor);
register(Actor.CRAWLER_MONITOR, crawlerMonitorActor);

View File

@ -119,14 +119,14 @@ public class AbstractProcessSpawnerActor extends AbstractActorPrototype {
if (attempts < MAX_ATTEMPTS) transition(RUN, attempts + 1);
else error();
}
else if (endTime - startTime < TimeUnit.SECONDS.toMillis(10)) {
else if (endTime - startTime < TimeUnit.SECONDS.toMillis(1)) {
// To avoid boot loops, we transition to error if the process
// didn't run for longer than 10 seconds. This might happen if
// didn't run for longer than 1 seconds. This might happen if
// the process crashes before it can reach the heartbeat and inbox
// stages of execution. In this case it would not report having acted
// on its message, and the process would be restarted forever without
// the attempts counter incrementing.
error("Process terminated within 10 seconds of starting");
error("Process terminated within 1 seconds of starting");
}
}
catch (InterruptedException ex) {

View File

@ -0,0 +1,22 @@
package nu.marginalia.control.actor.monitor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.control.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames;
@Singleton
public class IndexConstructorMonitorActor extends AbstractProcessSpawnerActor {
@Inject
public IndexConstructorMonitorActor(ActorStateFactory stateFactory,
MqPersistence persistence,
ProcessService processService) {
super(stateFactory, persistence, processService, ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX, ProcessService.ProcessId.INDEX_CONSTRUCTOR);
}
}

View File

@ -13,6 +13,8 @@ import nu.marginalia.index.client.IndexClient;
import nu.marginalia.index.client.IndexMqEndpoints;
import nu.marginalia.mqapi.converting.ConvertAction;
import nu.marginalia.mqapi.converting.ConvertRequest;
import nu.marginalia.mqapi.index.CreateIndexRequest;
import nu.marginalia.mqapi.index.IndexName;
import nu.marginalia.mqapi.loading.LoadRequest;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorageBaseType;
@ -45,14 +47,19 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
public static final String REPARTITION = "REPARTITION";
public static final String REPARTITION_WAIT = "REPARTITION-WAIT";
public static final String REINDEX = "REINDEX";
public static final String REINDEX_WAIT = "REINDEX-WAIT";
public static final String SWITCH_LINKDB = "SWITCH-LINKDB";
public static final String REINDEX_FWD = "REINDEX_FWD";
public static final String REINDEX_FWD_WAIT = "REINDEX-FWD-WAIT";
public static final String REINDEX_FULL = "REINDEX_FULL";
public static final String REINDEX_FULL_WAIT = "REINDEX-FULL-WAIT";
public static final String REINDEX_PRIO = "REINDEX_PRIO";
public static final String REINDEX_PRIO_WAIT = "REINDEX-PRIO-WAIT";
public static final String SWITCH_OVER = "SWITCH-LINKDB";
public static final String END = "END";
private final ActorProcessWatcher processWatcher;
private final MqOutbox mqConverterOutbox;
private final MqOutbox mqLoaderOutbox;
private final MqOutbox mqIndexConstructorOutbox;
private final MqOutbox indexOutbox;
private final MqOutbox searchOutbox;
private final FileStorageService storageService;
@ -89,6 +96,7 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
this.searchOutbox = searchClient.outbox();
this.mqConverterOutbox = processOutboxes.getConverterOutbox();
this.mqLoaderOutbox = processOutboxes.getLoaderOutbox();
this.mqIndexConstructorOutbox = processOutboxes.getIndexConstructorOutbox();
this.storageService = storageService;
this.gson = gson;
}
@ -228,7 +236,7 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
@ActorState(
name = REPARTITION_WAIT,
next = REINDEX,
next = REINDEX_FWD,
resume = ActorResumeBehavior.RETRY,
description = """
Wait for the index-service to finish repartitioning the index.
@ -243,26 +251,27 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
}
@ActorState(
name = REINDEX,
next = REINDEX_WAIT,
name = REINDEX_FWD,
next = REINDEX_FWD_WAIT,
description = """
Instruct the index-service to reindex the data then transition to REINDEX_WAIT.
Reconstruct the fwd index
"""
)
public Long reindex() throws Exception {
return indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REINDEX, "");
public Long reindexFwd() throws Exception {
var request = new CreateIndexRequest(IndexName.FORWARD);
return mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request));
}
@ActorState(
name = REINDEX_WAIT,
next = SWITCH_LINKDB,
name = REINDEX_FWD_WAIT,
next = REINDEX_FULL,
resume = ActorResumeBehavior.RETRY,
description = """
Wait for the index-service to finish reindexing the data.
Wait for the reindex job to finish.
"""
)
public void reindexReply(Long id) throws Exception {
var rsp = indexOutbox.waitResponse(id);
public void reindexFwdWait(Long id) throws Exception {
var rsp = mqIndexConstructorOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("Repartition failed");
@ -270,15 +279,74 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
}
@ActorState(
name = SWITCH_LINKDB,
name = REINDEX_FULL,
next = REINDEX_FULL_WAIT,
description = """
Reconstruct the full index
"""
)
public Long reindexFull() throws Exception {
var request = new CreateIndexRequest(IndexName.REVERSE_FULL);
return mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request));
}
@ActorState(
name = REINDEX_FULL_WAIT,
next = REINDEX_PRIO,
resume = ActorResumeBehavior.RETRY,
description = """
Wait for the reindex job to finish.
"""
)
public void reindexFullWait(Long id) throws Exception {
var rsp = mqIndexConstructorOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("Repartition failed");
}
}
@ActorState(
name = REINDEX_PRIO,
next = REINDEX_PRIO_WAIT,
resume = ActorResumeBehavior.RETRY,
description = """
Reconstruct the prio index
"""
)
public long reindexPrio() throws Exception {
var request = new CreateIndexRequest(IndexName.REVERSE_PRIO);
return mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request));
}
@ActorState(
name = REINDEX_PRIO_WAIT,
next = SWITCH_OVER,
resume = ActorResumeBehavior.RETRY,
description = """
Wait for the reindex job to finish.
"""
)
public void reindexPrioWait(Long id) throws Exception {
var rsp = mqIndexConstructorOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("Repartition failed");
}
}
@ActorState(
name = SWITCH_OVER,
next = END,
resume = ActorResumeBehavior.RETRY,
description = """
Instruct the search service to switch to the new linkdb
Instruct the search service to switch to the new linkdb,
and the index service to switch over to the new index.
"""
)
public void switchLinkdb(Long id) throws Exception {
public void switchOver(Long id) throws Exception {
searchOutbox.sendNotice(SearchMqEndpoints.SWITCH_LINKDB, ":-)");
indexOutbox.sendNotice(IndexMqEndpoints.INDEX_REINDEX, ":^D");
}
}

View File

@ -44,6 +44,7 @@ public record ProcessHeartbeat(
case "loader" -> ProcessService.ProcessId.LOADER;
case "website-adjacencies-calculator" -> ProcessService.ProcessId.ADJACENCIES_CALCULATOR;
case "crawl-job-extractor" -> ProcessService.ProcessId.CRAWL_JOB_EXTRACTOR;
case "index-constructor" -> ProcessService.ProcessId.INDEX_CONSTRUCTOR;
default -> null;
};
}

View File

@ -12,6 +12,7 @@ public class ProcessOutboxes {
private final MqOutbox converterOutbox;
private final MqOutbox loaderOutbox;
private final MqOutbox crawlerOutbox;
private final MqOutbox indexConstructorOutbox;
@Inject
public ProcessOutboxes(BaseServiceParams params, MqPersistence persistence) {
@ -30,6 +31,11 @@ public class ProcessOutboxes {
params.configuration.serviceName(),
params.configuration.instanceUuid()
);
indexConstructorOutbox = new MqOutbox(persistence,
ProcessInboxNames.INDEX_CONSTRUCTOR_INBOX,
params.configuration.serviceName(),
params.configuration.instanceUuid()
);
}
@ -44,4 +50,6 @@ public class ProcessOutboxes {
public MqOutbox getCrawlerOutbox() {
return crawlerOutbox;
}
public MqOutbox getIndexConstructorOutbox() { return indexConstructorOutbox; }
}

View File

@ -32,6 +32,7 @@ public class ProcessService {
CRAWLER("crawler-process/bin/crawler-process"),
CONVERTER("converter-process/bin/converter-process"),
LOADER("loader-process/bin/loader-process"),
INDEX_CONSTRUCTOR("index-construction-process/bin/index-construction-process"),
ADJACENCIES_CALCULATOR("website-adjacencies-calculator/bin/website-adjacencies-calculator"),
CRAWL_JOB_EXTRACTOR("crawl-job-extractor-process/bin/crawl-job-extractor-process")
;

View File

@ -11,6 +11,7 @@ import nu.marginalia.index.svc.IndexSearchSetsService;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.*;
import nu.marginalia.service.server.mq.MqNotification;
import nu.marginalia.service.server.mq.MqRequest;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
@ -93,7 +94,7 @@ public class IndexService extends Service {
return "ok";
}
@MqRequest(endpoint = IndexMqEndpoints.INDEX_REINDEX)
@MqNotification(endpoint = IndexMqEndpoints.INDEX_REINDEX)
public String reindex(String message) throws Exception {
if (!opsService.reindex()) {
throw new IllegalStateException("Ops lock busy");
@ -112,34 +113,8 @@ public class IndexService extends Service {
searchIndex.init();
initialized = true;
}
if (!opsService.run(this::autoConvert)) {
logger.warn("Auto-convert could not be performed, ops lock busy");
}
}
private void autoConvert() {
if (!servicesFactory.isConvertedIndexMissing()
|| !servicesFactory.isPreconvertedIndexPresent()
|| Boolean.getBoolean("no-auto-convert")
) {
return;
}
try {
eventLog.logEvent("INDEX-AUTO-CONVERT-BEGIN", "");
logger.info("Auto-converting");
searchSetsService.recalculateAll();
searchIndex.switchIndex();
eventLog.logEvent("INDEX-AUTO-CONVERT-END", "");
logger.info("Auto-conversion finished!");
}
catch (IOException ex) {
logger.error("Auto convert failed", ex);
}
}
}

View File

@ -5,8 +5,11 @@ import com.google.inject.Singleton;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.index.forward.ForwardIndexConverter;
import nu.marginalia.index.forward.ForwardIndexFileNames;
import nu.marginalia.index.forward.ForwardIndexReader;
import nu.marginalia.index.full.ReverseIndexFullFileNames;
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleCompressedFile;
import nu.marginalia.index.priority.ReverseIndexPrioFileNames;
import nu.marginalia.index.priority.ReverseIndexPriorityConverter;
import nu.marginalia.index.full.ReverseIndexFullConverter;
import nu.marginalia.index.priority.ReverseIndexPriorityReader;
@ -15,6 +18,7 @@ import nu.marginalia.index.full.ReverseIndexFullReader;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.index.index.SearchIndexReader;
import nu.marginalia.service.control.ServiceHeartbeat;
import org.checkerframework.checker.units.qual.C;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -29,23 +33,8 @@ import java.util.stream.Stream;
@Singleton
public class IndexServicesFactory {
private final Path tmpFileDir;
private final ServiceHeartbeat heartbeat;
private final Path liveStorage;
private final Path stagingStorage;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Path writerIndexFile;
private final PartitionedDataFile fwdIndexDocId;
private final PartitionedDataFile fwdIndexDocData;
private final PartitionedDataFile revIndexDoc;
private final PartitionedDataFile revIndexWords;
private final PartitionedDataFile revPrioIndexDoc;
private final PartitionedDataFile revPrioIndexWords;
private final Path searchSetsBase;
final int LIVE_PART = 0;
@ -56,165 +45,58 @@ public class IndexServicesFactory {
ServiceHeartbeat heartbeat,
FileStorageService fileStorageService
) throws IOException, SQLException {
this.heartbeat = heartbeat;
liveStorage = fileStorageService.getStorageByType(FileStorageType.INDEX_LIVE).asPath();
stagingStorage = fileStorageService.getStorageByType(FileStorageType.INDEX_STAGING).asPath();
tmpFileDir = fileStorageService.getStorageByType(FileStorageType.INDEX_STAGING).asPath().resolve("tmp");
searchSetsBase = fileStorageService.getStorageByType(FileStorageType.SEARCH_SETS).asPath();
if (!Files.exists(tmpFileDir)) {
Files.createDirectories(tmpFileDir);
}
writerIndexFile = stagingStorage.resolve("page-index.dat");
fwdIndexDocId = new PartitionedDataFile(liveStorage, "fwd-doc-id.dat");
fwdIndexDocData = new PartitionedDataFile(liveStorage, "fwd-doc-data.dat");
revIndexDoc = new PartitionedDataFile(liveStorage, "rev-doc.dat");
revIndexWords = new PartitionedDataFile(liveStorage, "rev-words.dat");
revPrioIndexDoc = new PartitionedDataFile(liveStorage, "rev-prio-doc.dat");
revPrioIndexWords = new PartitionedDataFile(liveStorage, "rev-prio-words.dat");
}
public Path getSearchSetsBase() {
return searchSetsBase;
}
public boolean isPreconvertedIndexPresent() {
return Stream.of(
writerIndexFile
).allMatch(Files::exists);
}
public boolean isConvertedIndexMissing() {
return Stream.of(
revIndexWords.get(LIVE_PART).toPath(),
revIndexDoc.get(LIVE_PART).toPath(),
revPrioIndexWords.get(LIVE_PART).toPath(),
revPrioIndexDoc.get(LIVE_PART).toPath(),
fwdIndexDocData.get(LIVE_PART).toPath(),
fwdIndexDocId.get(LIVE_PART).toPath()
).noneMatch(Files::exists);
}
enum ConvertSteps {
FORWARD_INDEX,
FULL_REVERSE_INDEX,
PRIORITY_REVERSE_INDEX,
FINISHED
}
public void convertIndex(DomainRankings domainRankings) throws IOException {
try (var hb = heartbeat.createServiceTaskHeartbeat(ConvertSteps.class, "index-conversion")) {
hb.progress(ConvertSteps.FORWARD_INDEX);
convertForwardIndex(domainRankings);
hb.progress(ConvertSteps.FULL_REVERSE_INDEX);
convertFullReverseIndex(domainRankings);
hb.progress(ConvertSteps.PRIORITY_REVERSE_INDEX);
convertPriorityReverseIndex(domainRankings);
hb.progress(ConvertSteps.FINISHED);
}
}
private void convertFullReverseIndex(DomainRankings domainRankings) throws IOException {
logger.info("Converting full reverse index {}", writerIndexFile);
var journalReader = new IndexJournalReaderSingleCompressedFile(writerIndexFile);
var converter = new ReverseIndexFullConverter(
heartbeat,
tmpFileDir,
journalReader,
domainRankings,
revIndexWords.get(NEXT_PART).toPath(),
revIndexDoc.get(NEXT_PART).toPath());
converter.convert();
tryGc();
}
private void convertPriorityReverseIndex(DomainRankings domainRankings) throws IOException {
logger.info("Converting priority reverse index {}", writerIndexFile);
var journalReader = new IndexJournalReaderSingleCompressedFile(writerIndexFile, null,
ReverseIndexPriorityParameters::filterPriorityRecord);
var converter = new ReverseIndexPriorityConverter(heartbeat,
tmpFileDir,
journalReader,
domainRankings,
revPrioIndexWords.get(NEXT_PART).toPath(),
revPrioIndexDoc.get(NEXT_PART).toPath());
converter.convert();
tryGc();
}
private void convertForwardIndex(DomainRankings domainRankings) throws IOException {
logger.info("Converting forward index data {}", writerIndexFile);
new ForwardIndexConverter(heartbeat,
writerIndexFile.toFile(),
fwdIndexDocId.get(NEXT_PART).toPath(),
fwdIndexDocData.get(NEXT_PART).toPath(),
domainRankings)
.convert();
tryGc();
}
public void tryGc() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.gc();
}
public ReverseIndexFullReader getReverseIndexReader() throws IOException {
return new ReverseIndexFullReader(
revIndexWords.get(LIVE_PART).toPath(),
revIndexDoc.get(LIVE_PART).toPath());
}
public ReverseIndexPriorityReader getReverseIndexPrioReader() throws IOException {
return new ReverseIndexPriorityReader(
revPrioIndexWords.get(LIVE_PART).toPath(),
revPrioIndexDoc.get(LIVE_PART).toPath());
}
public ForwardIndexReader getForwardIndexReader() throws IOException {
return new ForwardIndexReader(
fwdIndexDocId.get(LIVE_PART).toPath(),
fwdIndexDocData.get(LIVE_PART).toPath()
ReverseIndexFullFileNames.resolve(liveStorage, ReverseIndexFullFileNames.FileIdentifier.WORDS, ReverseIndexFullFileNames.FileVersion.CURRENT),
ReverseIndexFullFileNames.resolve(liveStorage, ReverseIndexFullFileNames.FileIdentifier.DOCS, ReverseIndexFullFileNames.FileVersion.CURRENT)
);
}
public Callable<Boolean> switchFilesJob() {
return () -> {
public ReverseIndexPriorityReader getReverseIndexPrioReader() throws IOException {
return new ReverseIndexPriorityReader(
ReverseIndexPrioFileNames.resolve(liveStorage, ReverseIndexPrioFileNames.FileIdentifier.WORDS, ReverseIndexPrioFileNames.FileVersion.CURRENT),
ReverseIndexPrioFileNames.resolve(liveStorage, ReverseIndexPrioFileNames.FileIdentifier.DOCS, ReverseIndexPrioFileNames.FileVersion.CURRENT)
);
}
switchFile(revIndexDoc.get(NEXT_PART).toPath(), revIndexDoc.get(LIVE_PART).toPath());
switchFile(revIndexWords.get(NEXT_PART).toPath(), revIndexWords.get(LIVE_PART).toPath());
public ForwardIndexReader getForwardIndexReader() throws IOException {
return new ForwardIndexReader(
ForwardIndexFileNames.resolve(liveStorage, ForwardIndexFileNames.FileIdentifier.DOC_ID, ForwardIndexFileNames.FileVersion.CURRENT),
ForwardIndexFileNames.resolve(liveStorage, ForwardIndexFileNames.FileIdentifier.DOC_DATA, ForwardIndexFileNames.FileVersion.CURRENT)
);
}
switchFile(revPrioIndexDoc.get(NEXT_PART).toPath(), revPrioIndexDoc.get(LIVE_PART).toPath());
switchFile(revPrioIndexWords.get(NEXT_PART).toPath(), revPrioIndexWords.get(LIVE_PART).toPath());
public void switchFiles() throws IOException {
switchFile(fwdIndexDocId.get(NEXT_PART).toPath(), fwdIndexDocId.get(LIVE_PART).toPath());
switchFile(fwdIndexDocData.get(NEXT_PART).toPath(), fwdIndexDocData.get(LIVE_PART).toPath());
return true;
};
for (var file : ReverseIndexFullFileNames.FileIdentifier.values()) {
switchFile(
ReverseIndexFullFileNames.resolve(liveStorage, file, ReverseIndexFullFileNames.FileVersion.NEXT),
ReverseIndexFullFileNames.resolve(liveStorage, file, ReverseIndexFullFileNames.FileVersion.CURRENT)
);
}
for (var file : ReverseIndexPrioFileNames.FileIdentifier.values()) {
switchFile(
ReverseIndexPrioFileNames.resolve(liveStorage, file, ReverseIndexPrioFileNames.FileVersion.NEXT),
ReverseIndexPrioFileNames.resolve(liveStorage, file, ReverseIndexPrioFileNames.FileVersion.CURRENT)
);
}
for (var file : ForwardIndexFileNames.FileIdentifier.values()) {
switchFile(
ForwardIndexFileNames.resolve(liveStorage, file, ForwardIndexFileNames.FileVersion.NEXT),
ForwardIndexFileNames.resolve(liveStorage, file, ForwardIndexFileNames.FileVersion.CURRENT)
);
}
}
public void switchFile(Path from, Path to) throws IOException {
@ -231,37 +113,3 @@ public class IndexServicesFactory {
);
}
}
class RootDataFile {
private final Path partition;
private final String pattern;
RootDataFile(Path partition, String pattern) {
this.partition = partition;
this.pattern = pattern;
}
public File get() {
return partition.resolve(pattern).toFile();
}
}
class PartitionedDataFile {
private final Path partition;
private final String pattern;
PartitionedDataFile(Path partition, String pattern) {
this.partition = partition;
this.pattern = pattern;
}
public File get(Object id) {
Path partitionDir = partition.resolve(id.toString());
if (!partitionDir.toFile().exists()) {
partitionDir.toFile().mkdir();
}
return partitionDir.resolve(pattern).toFile();
}
}

View File

@ -62,8 +62,6 @@ public class SearchIndex {
else {
eventLog.logEvent("INDEX-INIT", "No index loaded");
}
}
catch (Exception ex) {
logger.error("Uncaught exception", ex);
@ -74,19 +72,12 @@ public class SearchIndex {
}
public boolean switchIndex() throws IOException {
eventLog.logEvent("CONVERT-INDEX-BEGIN", "");
servicesFactory.convertIndex(searchSetsService.getDomainRankings());
eventLog.logEvent("CONVERT-INDEX-END", "");
System.gc();
eventLog.logEvent("INDEX-SWITCH-BEGIN", "");
Lock lock = indexReplacementLock.writeLock();
try {
lock.lock();
servicesFactory.switchFilesJob().call();
servicesFactory.switchFiles();
indexReader = servicesFactory.getSearchIndexReader();
eventLog.logEvent("INDEX-SWITCH-OK", "");

View File

@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.2-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.3-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists

View File

@ -61,6 +61,7 @@ include 'code:common:process'
include 'code:processes:converting-process'
include 'code:processes:crawling-process'
include 'code:processes:loading-process'
include 'code:processes:index-constructor-process'
include 'code:processes:test-data'
include 'code:process-models:converting-model'