diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index eefb2be2..7afab740 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -92,8 +92,6 @@ public class ConverterMain { final int maxPoolSize = Runtime.getRuntime().availableProcessors(); - - try (BatchingWorkLog batchingWorkLog = new BatchingWorkLogImpl(plan.process.getLogFile()); ConverterWriter converterWriter = new ConverterWriter(batchingWorkLog, plan.process.getDir())) { @@ -111,6 +109,7 @@ public class ConverterMain { pool.submit(() -> { ProcessedDomain processed = processor.process(domain); converterWriter.accept(processed); + heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains); }); } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterWriter.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterWriter.java index 595601a5..ac0dd71c 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterWriter.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/writer/ConverterWriter.java @@ -3,6 +3,8 @@ package nu.marginalia.converting.writer; import lombok.SneakyThrows; import nu.marginalia.converting.model.ProcessedDomain; import nu.marginalia.worklog.BatchingWorkLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.file.Path; import java.time.Duration; @@ -14,19 +16,22 @@ import java.util.concurrent.TimeUnit; public class ConverterWriter implements AutoCloseable { + private static final Logger logger = LoggerFactory.getLogger(ConverterWriter.class); + private final BatchingWorkLog workLog; private final Path basePath; - private final Duration switchInterval = - Duration.of(10, ChronoUnit.MINUTES); - private final ArrayBlockingQueue domainData = - new ArrayBlockingQueue<>(4); + private final Duration switchInterval + = Duration.of(10, ChronoUnit.MINUTES); + private final ArrayBlockingQueue domainData + = new ArrayBlockingQueue<>(4); private final Thread workerThread; - ConverterBatchWriter writer; + private ConverterBatchWriter currentWriter; volatile boolean running = true; + public ConverterWriter(BatchingWorkLog workLog, Path basePath) { this.workLog = workLog; this.basePath = basePath; @@ -44,20 +49,27 @@ public class ConverterWriter implements AutoCloseable { private void writerThread() { IntervalAction switcher = new IntervalAction(this::switchBatch, switchInterval); - writer = new ConverterBatchWriter(basePath, workLog.getBatchNumber()); + currentWriter = new ConverterBatchWriter(basePath, workLog.getBatchNumber()); while (running || !domainData.isEmpty()) { - var data = domainData.poll(10, TimeUnit.SECONDS); + // poll with a timeout so we have an + // opportunity to check the running condition + // ... we could interrupt the thread as well, but + // as we enter third party code it's difficult to guarantee it will deal + // well with being interrupted + var data = domainData.poll(1, TimeUnit.SECONDS); if (data == null) continue; String id = data.id; - if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id)) + if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id)) { + logger.warn("Skipping already logged item {}", id); continue; + } - writer.write(data); + currentWriter.write(data); workLog.logItem(id); @@ -72,10 +84,12 @@ public class ConverterWriter implements AutoCloseable { return false; } + // order matters here - writer.close(); + currentWriter.close(); workLog.logFinishedBatch(); - writer = new ConverterBatchWriter(basePath, workLog.getBatchNumber()); + logger.info("Switching to batch {}", workLog.getBatchNumber()); + currentWriter = new ConverterBatchWriter(basePath, workLog.getBatchNumber()); return true; } @@ -86,7 +100,7 @@ public class ConverterWriter implements AutoCloseable { workerThread.join(); // order matters here - writer.close(); + currentWriter.close(); workLog.logFinishedBatch(); } } @@ -105,17 +119,17 @@ class IntervalAction { /** Execute the provided action if enough time has passed * since the last successful invocation */ public void tick() { + var now = Instant.now(); if (nextActionInstant == null) { - nextActionInstant = Instant.now().plus(interval); + nextActionInstant = now.plus(interval); return; } - if (Instant.now().isBefore(nextActionInstant)) - return; - try { - if (action.call()) { - nextActionInstant = Instant.now().plus(interval); + if (now.isAfter(nextActionInstant) + && action.call()) + { + nextActionInstant = now.plus(interval); } } catch (Exception ex) { diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/LoaderIndexJournalWriter.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderIndexJournalWriter.java similarity index 98% rename from code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/LoaderIndexJournalWriter.java rename to code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderIndexJournalWriter.java index 0bfd6193..c3c9d6f9 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/LoaderIndexJournalWriter.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderIndexJournalWriter.java @@ -1,4 +1,4 @@ -package nu.marginalia.loading.documents; +package nu.marginalia.loading; import com.google.inject.Inject; import com.google.inject.Singleton; diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java index 675feb3d..46f206db 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java @@ -9,7 +9,6 @@ import nu.marginalia.db.storage.FileStorageService; import nu.marginalia.linkdb.LinkdbWriter; import nu.marginalia.loading.documents.DocumentLoaderService; import nu.marginalia.loading.documents.KeywordLoaderService; -import nu.marginalia.loading.documents.LoaderIndexJournalWriter; import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.loading.domains.DomainLoaderService; import nu.marginalia.loading.links.DomainLinksLoaderService; @@ -26,8 +25,11 @@ import org.slf4j.LoggerFactory; import java.nio.file.Path; import java.sql.SQLException; +import java.util.List; import java.util.Optional; import java.util.UUID; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import static nu.marginalia.mqapi.ProcessInboxNames.LOADER_INBOX; @@ -106,12 +108,20 @@ public class LoaderMain { validBatchCount); try { - linksService - .loadLinks(domainIdRegistry, inputDataDir, validBatchCount); - keywordLoaderService - .loadKeywords(domainIdRegistry, inputDataDir, validBatchCount); - documentLoaderService - .loadDocuments(domainIdRegistry, inputDataDir, validBatchCount); + var results = ForkJoinPool.commonPool() + .invokeAll( + List.of( + () -> linksService.loadLinks(domainIdRegistry, heartbeat, inputDataDir, validBatchCount), + () -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputDataDir, validBatchCount), + () -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputDataDir, validBatchCount) + ) + ); + + for (var result : results) { + if (result.state() == Future.State.FAILED) { + throw result.exceptionNow(); + } + } instructions.ok(); } @@ -125,7 +135,6 @@ public class LoaderMain { heartbeat.shutDown(); } - System.exit(0); } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/DocumentLoaderService.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/DocumentLoaderService.java index c6ea5a5e..130957f8 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/DocumentLoaderService.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/DocumentLoaderService.java @@ -11,6 +11,9 @@ import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.id.UrlIdCodec; import nu.marginalia.model.processed.DocumentRecordMetadataProjection; +import nu.marginalia.process.control.ProcessHeartbeat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.file.Path; @@ -20,6 +23,8 @@ import java.util.List; @Singleton public class DocumentLoaderService { + private static final Logger logger = LoggerFactory.getLogger(DocumentLoaderService.class); + private final LinkdbWriter linkdbWriter; @Inject @@ -27,15 +32,30 @@ public class DocumentLoaderService { this.linkdbWriter = linkdbWriter; } - public void loadDocuments(DomainIdRegistry domainIdRegistry, + public boolean loadDocuments( + DomainIdRegistry domainIdRegistry, + ProcessHeartbeat processHeartbeat, Path processedDataPathBase, int untilBatch) throws IOException, SQLException { var documentFiles = ProcessedDataFileNames.listDocumentFiles(processedDataPathBase, untilBatch); - for (var file : documentFiles) { - loadDocumentsFromFile(domainIdRegistry, file); + + try (var taskHeartbeat = processHeartbeat.createAdHocTaskHeartbeat("DOCUMENTS")) { + + int processed = 0; + + for (var file : documentFiles) { + taskHeartbeat.progress("LOAD", processed++, documentFiles.size()); + + loadDocumentsFromFile(domainIdRegistry, file); + } + taskHeartbeat.progress("LOAD", processed, documentFiles.size()); } + + logger.info("Finished"); + + return true; } private void loadDocumentsFromFile(DomainIdRegistry domainIdRegistry, Path file) @@ -45,6 +65,8 @@ public class DocumentLoaderService { LinkdbLoader loader = new LinkdbLoader(domainIdRegistry) ) { + logger.info("Loading document meta from {}", file); + stream.forEach(loader::accept); } } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/KeywordLoaderService.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/KeywordLoaderService.java index ef9b619e..9d9d0a4d 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/KeywordLoaderService.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/KeywordLoaderService.java @@ -5,15 +5,20 @@ import com.google.inject.Singleton; import nu.marginalia.io.processed.DocumentRecordParquetFileReader; import nu.marginalia.io.processed.ProcessedDataFileNames; import nu.marginalia.keyword.model.DocumentKeywords; +import nu.marginalia.loading.LoaderIndexJournalWriter; import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.model.id.UrlIdCodec; import nu.marginalia.model.processed.DocumentRecordKeywordsProjection; +import nu.marginalia.process.control.ProcessHeartbeat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.file.Path; @Singleton public class KeywordLoaderService { + private static final Logger logger = LoggerFactory.getLogger(KeywordLoaderService.class); private final LoaderIndexJournalWriter writer; @Inject @@ -21,17 +26,33 @@ public class KeywordLoaderService { this.writer = writer; } - public void loadKeywords(DomainIdRegistry domainIdRegistry, + public boolean loadKeywords(DomainIdRegistry domainIdRegistry, + ProcessHeartbeat heartbeat, Path processedDataPathBase, int untilBatch) throws IOException { - var documentFiles = ProcessedDataFileNames.listDocumentFiles(processedDataPathBase, untilBatch); - for (var file : documentFiles) { - loadKeywordsFromFile(domainIdRegistry, file); + try (var task = heartbeat.createAdHocTaskHeartbeat("KEYWORDS")) { + + var documentFiles = ProcessedDataFileNames.listDocumentFiles(processedDataPathBase, untilBatch); + int processed = 0; + + for (var file : documentFiles) { + task.progress("LOAD", processed++, documentFiles.size()); + + loadKeywordsFromFile(domainIdRegistry, file); + } + + task.progress("LOAD", processed, documentFiles.size()); } + + logger.info("Finished"); + + return true; } private void loadKeywordsFromFile(DomainIdRegistry domainIdRegistry, Path file) throws IOException { try (var stream = DocumentRecordParquetFileReader.streamKeywordsProjection(file)) { + logger.info("Loading keywords from {}", file); + stream.filter(DocumentRecordKeywordsProjection::hasKeywords) .forEach(proj -> insertKeywords(domainIdRegistry, proj)); } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/domains/DomainIdRegistry.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/domains/DomainIdRegistry.java index cb825641..2ab6ba46 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/domains/DomainIdRegistry.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/domains/DomainIdRegistry.java @@ -1,7 +1,5 @@ package nu.marginalia.loading.domains; -import nu.marginalia.model.EdgeDomain; - import java.util.HashMap; import java.util.Map; @@ -20,15 +18,6 @@ public class DomainIdRegistry { return id; } - public int getDomainId(EdgeDomain domainName) { - return getDomainId(domainName.toString()); - } - - - public boolean isKnown(String domainName) { - return domainIds.containsKey(domainName); - } - void add(String domainName, int id) { domainIds.put(domainName, id); } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/links/DomainLinksLoaderService.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/links/DomainLinksLoaderService.java index c70647a6..d38112ca 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/links/DomainLinksLoaderService.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/links/DomainLinksLoaderService.java @@ -7,6 +7,7 @@ import nu.marginalia.io.processed.DomainLinkRecordParquetFileReader; import nu.marginalia.io.processed.ProcessedDataFileNames; import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.model.processed.DomainLinkRecord; +import nu.marginalia.process.control.ProcessHeartbeat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,14 +28,29 @@ public class DomainLinksLoaderService { this.dataSource = dataSource; } - public void loadLinks(DomainIdRegistry domainIdRegistry, Path processedDataPathBase, int untilBatch) throws IOException, SQLException { + public boolean loadLinks(DomainIdRegistry domainIdRegistry, + ProcessHeartbeat heartbeat, + Path processedDataPathBase, + int untilBatch) throws IOException, SQLException { dropLinkData(); - var linkFiles = ProcessedDataFileNames.listDomainLinkFiles(processedDataPathBase, untilBatch); - for (var file : linkFiles) { - loadLinksFromFile(domainIdRegistry, file); + try (var task = heartbeat.createAdHocTaskHeartbeat("LINKS")) { + var linkFiles = ProcessedDataFileNames.listDomainLinkFiles(processedDataPathBase, untilBatch); + + int processed = 0; + + for (var file : linkFiles) { + task.progress("LOAD", processed++, linkFiles.size()); + + loadLinksFromFile(domainIdRegistry, file); + } + + task.progress("LOAD", processed, linkFiles.size()); } + + logger.info("Finished"); + return true; } private void dropLinkData() throws SQLException { diff --git a/code/processes/loading-process/src/test/java/nu/marginalia/loading/domains/DomainLoaderServiceTest.java b/code/processes/loading-process/src/test/java/nu/marginalia/loading/domains/DomainLoaderServiceTest.java index 9340fe14..a3d16162 100644 --- a/code/processes/loading-process/src/test/java/nu/marginalia/loading/domains/DomainLoaderServiceTest.java +++ b/code/processes/loading-process/src/test/java/nu/marginalia/loading/domains/DomainLoaderServiceTest.java @@ -7,10 +7,10 @@ import nu.marginalia.io.processed.ProcessedDataFileNames; import nu.marginalia.loader.DbTestUtil; import nu.marginalia.model.processed.DomainLinkRecord; import nu.marginalia.model.processed.DomainRecord; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; +import nu.marginalia.process.control.ProcessAdHocTaskHeartbeat; +import nu.marginalia.process.control.ProcessHeartbeat; +import org.junit.jupiter.api.*; +import org.mockito.Mockito; import org.testcontainers.containers.MariaDBContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -29,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.*; @Testcontainers class DomainLoaderServiceTest { List toDelete = new ArrayList<>(); + ProcessHeartbeat heartbeat; @Container static MariaDBContainer mariaDBContainer = new MariaDBContainer<>("mariadb") @@ -38,6 +39,15 @@ class DomainLoaderServiceTest { .withInitScript("db/migration/V23_06_0_000__base.sql") .withNetworkAliases("mariadb"); + @BeforeEach + public void setUp() { + heartbeat = Mockito.mock(ProcessHeartbeat.class); + + Mockito.when(heartbeat.createAdHocTaskHeartbeat(Mockito.anyString())).thenReturn( + Mockito.mock(ProcessAdHocTaskHeartbeat.class) + ); + } + @AfterEach public void tearDown() throws IOException { for (var path : Lists.reverse(toDelete)) { diff --git a/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java b/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java index 2f1f9b00..df80020f 100644 --- a/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java +++ b/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java @@ -8,9 +8,14 @@ import nu.marginalia.loader.DbTestUtil; import nu.marginalia.loading.domains.DomainLoaderService; import nu.marginalia.model.processed.DomainLinkRecord; import nu.marginalia.model.processed.DomainRecord; +import nu.marginalia.process.control.ProcessAdHocTaskHeartbeat; +import nu.marginalia.process.control.ProcessHeartbeat; +import nu.marginalia.process.control.ProcessHeartbeatImpl; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.mockito.Mockito; import org.testcontainers.containers.MariaDBContainer; import org.testcontainers.junit.jupiter.Container; import org.testcontainers.junit.jupiter.Testcontainers; @@ -27,6 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; @Testcontainers class DomainLinksLoaderServiceTest { List toDelete = new ArrayList<>(); + ProcessHeartbeat heartbeat; @Container static MariaDBContainer mariaDBContainer = new MariaDBContainer<>("mariadb") @@ -36,6 +42,15 @@ class DomainLinksLoaderServiceTest { .withInitScript("db/migration/V23_06_0_000__base.sql") .withNetworkAliases("mariadb"); + @BeforeEach + public void setUp() { + heartbeat = Mockito.mock(ProcessHeartbeat.class); + + Mockito.when(heartbeat.createAdHocTaskHeartbeat(Mockito.anyString())).thenReturn( + Mockito.mock(ProcessAdHocTaskHeartbeat.class) + ); + } + @AfterEach public void tearDown() throws IOException { for (var path : Lists.reverse(toDelete)) { @@ -87,7 +102,7 @@ class DomainLinksLoaderServiceTest { var domainRegistry = domainService.getOrCreateDomainIds(workDir, 2); var dls = new DomainLinksLoaderService(dataSource); - dls.loadLinks(domainRegistry, workDir, 2); + dls.loadLinks(domainRegistry, heartbeat, workDir, 2); Map> expected = new HashMap<>(); Map> actual = new HashMap<>(); diff --git a/code/processes/loading-process/src/test/java/nu/marginalia/loading/loader/LoaderIndexJournalWriterTest.java b/code/processes/loading-process/src/test/java/nu/marginalia/loading/loader/LoaderIndexJournalWriterTest.java index 709b3110..472e692d 100644 --- a/code/processes/loading-process/src/test/java/nu/marginalia/loading/loader/LoaderIndexJournalWriterTest.java +++ b/code/processes/loading-process/src/test/java/nu/marginalia/loading/loader/LoaderIndexJournalWriterTest.java @@ -5,7 +5,7 @@ import nu.marginalia.db.storage.model.FileStorage; import nu.marginalia.db.storage.model.FileStorageType; import nu.marginalia.index.journal.reader.IndexJournalReaderSingleFile; import nu.marginalia.keyword.model.DocumentKeywords; -import nu.marginalia.loading.documents.LoaderIndexJournalWriter; +import nu.marginalia.loading.LoaderIndexJournalWriter; import nu.marginalia.model.idx.DocumentMetadata; import nu.marginallia.index.journal.IndexJournalFileNames; import org.junit.jupiter.api.AfterEach;