(converter) Add heartbeats to the loader processes and execute the tasks in parallel for a ~2X speedup

This commit is contained in:
Viktor Lofgren 2023-09-14 10:11:57 +02:00
parent 87a8593291
commit c71f6ad417
11 changed files with 152 additions and 57 deletions

View File

@ -92,8 +92,6 @@ public class ConverterMain {
final int maxPoolSize = Runtime.getRuntime().availableProcessors();
try (BatchingWorkLog batchingWorkLog = new BatchingWorkLogImpl(plan.process.getLogFile());
ConverterWriter converterWriter = new ConverterWriter(batchingWorkLog, plan.process.getDir()))
{
@ -111,6 +109,7 @@ public class ConverterMain {
pool.submit(() -> {
ProcessedDomain processed = processor.process(domain);
converterWriter.accept(processed);
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
});
}

View File

@ -3,6 +3,8 @@ package nu.marginalia.converting.writer;
import lombok.SneakyThrows;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.worklog.BatchingWorkLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.time.Duration;
@ -14,19 +16,22 @@ import java.util.concurrent.TimeUnit;
public class ConverterWriter implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ConverterWriter.class);
private final BatchingWorkLog workLog;
private final Path basePath;
private final Duration switchInterval =
Duration.of(10, ChronoUnit.MINUTES);
private final ArrayBlockingQueue<ProcessedDomain> domainData =
new ArrayBlockingQueue<>(4);
private final Duration switchInterval
= Duration.of(10, ChronoUnit.MINUTES);
private final ArrayBlockingQueue<ProcessedDomain> domainData
= new ArrayBlockingQueue<>(4);
private final Thread workerThread;
ConverterBatchWriter writer;
private ConverterBatchWriter currentWriter;
volatile boolean running = true;
public ConverterWriter(BatchingWorkLog workLog, Path basePath) {
this.workLog = workLog;
this.basePath = basePath;
@ -44,20 +49,27 @@ public class ConverterWriter implements AutoCloseable {
private void writerThread() {
IntervalAction switcher = new IntervalAction(this::switchBatch, switchInterval);
writer = new ConverterBatchWriter(basePath, workLog.getBatchNumber());
currentWriter = new ConverterBatchWriter(basePath, workLog.getBatchNumber());
while (running || !domainData.isEmpty()) {
var data = domainData.poll(10, TimeUnit.SECONDS);
// poll with a timeout so we have an
// opportunity to check the running condition
// ... we could interrupt the thread as well, but
// as we enter third party code it's difficult to guarantee it will deal
// well with being interrupted
var data = domainData.poll(1, TimeUnit.SECONDS);
if (data == null)
continue;
String id = data.id;
if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id))
if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id)) {
logger.warn("Skipping already logged item {}", id);
continue;
}
writer.write(data);
currentWriter.write(data);
workLog.logItem(id);
@ -72,10 +84,12 @@ public class ConverterWriter implements AutoCloseable {
return false;
}
// order matters here
writer.close();
currentWriter.close();
workLog.logFinishedBatch();
writer = new ConverterBatchWriter(basePath, workLog.getBatchNumber());
logger.info("Switching to batch {}", workLog.getBatchNumber());
currentWriter = new ConverterBatchWriter(basePath, workLog.getBatchNumber());
return true;
}
@ -86,7 +100,7 @@ public class ConverterWriter implements AutoCloseable {
workerThread.join();
// order matters here
writer.close();
currentWriter.close();
workLog.logFinishedBatch();
}
}
@ -105,17 +119,17 @@ class IntervalAction {
/** Execute the provided action if enough time has passed
* since the last successful invocation */
public void tick() {
var now = Instant.now();
if (nextActionInstant == null) {
nextActionInstant = Instant.now().plus(interval);
nextActionInstant = now.plus(interval);
return;
}
if (Instant.now().isBefore(nextActionInstant))
return;
try {
if (action.call()) {
nextActionInstant = Instant.now().plus(interval);
if (now.isAfter(nextActionInstant)
&& action.call())
{
nextActionInstant = now.plus(interval);
}
}
catch (Exception ex) {

View File

@ -1,4 +1,4 @@
package nu.marginalia.loading.documents;
package nu.marginalia.loading;
import com.google.inject.Inject;
import com.google.inject.Singleton;

View File

@ -9,7 +9,6 @@ import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.linkdb.LinkdbWriter;
import nu.marginalia.loading.documents.DocumentLoaderService;
import nu.marginalia.loading.documents.KeywordLoaderService;
import nu.marginalia.loading.documents.LoaderIndexJournalWriter;
import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.loading.domains.DomainLoaderService;
import nu.marginalia.loading.links.DomainLinksLoaderService;
@ -26,8 +25,11 @@ import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static nu.marginalia.mqapi.ProcessInboxNames.LOADER_INBOX;
@ -106,12 +108,20 @@ public class LoaderMain {
validBatchCount);
try {
linksService
.loadLinks(domainIdRegistry, inputDataDir, validBatchCount);
keywordLoaderService
.loadKeywords(domainIdRegistry, inputDataDir, validBatchCount);
documentLoaderService
.loadDocuments(domainIdRegistry, inputDataDir, validBatchCount);
var results = ForkJoinPool.commonPool()
.invokeAll(
List.of(
() -> linksService.loadLinks(domainIdRegistry, heartbeat, inputDataDir, validBatchCount),
() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputDataDir, validBatchCount),
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputDataDir, validBatchCount)
)
);
for (var result : results) {
if (result.state() == Future.State.FAILED) {
throw result.exceptionNow();
}
}
instructions.ok();
}
@ -125,7 +135,6 @@ public class LoaderMain {
heartbeat.shutDown();
}
System.exit(0);
}

View File

@ -11,6 +11,9 @@ import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.model.processed.DocumentRecordMetadataProjection;
import nu.marginalia.process.control.ProcessHeartbeat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
@ -20,6 +23,8 @@ import java.util.List;
@Singleton
public class DocumentLoaderService {
private static final Logger logger = LoggerFactory.getLogger(DocumentLoaderService.class);
private final LinkdbWriter linkdbWriter;
@Inject
@ -27,15 +32,30 @@ public class DocumentLoaderService {
this.linkdbWriter = linkdbWriter;
}
public void loadDocuments(DomainIdRegistry domainIdRegistry,
public boolean loadDocuments(
DomainIdRegistry domainIdRegistry,
ProcessHeartbeat processHeartbeat,
Path processedDataPathBase,
int untilBatch)
throws IOException, SQLException
{
var documentFiles = ProcessedDataFileNames.listDocumentFiles(processedDataPathBase, untilBatch);
for (var file : documentFiles) {
loadDocumentsFromFile(domainIdRegistry, file);
try (var taskHeartbeat = processHeartbeat.createAdHocTaskHeartbeat("DOCUMENTS")) {
int processed = 0;
for (var file : documentFiles) {
taskHeartbeat.progress("LOAD", processed++, documentFiles.size());
loadDocumentsFromFile(domainIdRegistry, file);
}
taskHeartbeat.progress("LOAD", processed, documentFiles.size());
}
logger.info("Finished");
return true;
}
private void loadDocumentsFromFile(DomainIdRegistry domainIdRegistry, Path file)
@ -45,6 +65,8 @@ public class DocumentLoaderService {
LinkdbLoader loader = new LinkdbLoader(domainIdRegistry)
)
{
logger.info("Loading document meta from {}", file);
stream.forEach(loader::accept);
}
}

View File

@ -5,15 +5,20 @@ import com.google.inject.Singleton;
import nu.marginalia.io.processed.DocumentRecordParquetFileReader;
import nu.marginalia.io.processed.ProcessedDataFileNames;
import nu.marginalia.keyword.model.DocumentKeywords;
import nu.marginalia.loading.LoaderIndexJournalWriter;
import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.model.processed.DocumentRecordKeywordsProjection;
import nu.marginalia.process.control.ProcessHeartbeat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
@Singleton
public class KeywordLoaderService {
private static final Logger logger = LoggerFactory.getLogger(KeywordLoaderService.class);
private final LoaderIndexJournalWriter writer;
@Inject
@ -21,17 +26,33 @@ public class KeywordLoaderService {
this.writer = writer;
}
public void loadKeywords(DomainIdRegistry domainIdRegistry,
public boolean loadKeywords(DomainIdRegistry domainIdRegistry,
ProcessHeartbeat heartbeat,
Path processedDataPathBase,
int untilBatch) throws IOException {
var documentFiles = ProcessedDataFileNames.listDocumentFiles(processedDataPathBase, untilBatch);
for (var file : documentFiles) {
loadKeywordsFromFile(domainIdRegistry, file);
try (var task = heartbeat.createAdHocTaskHeartbeat("KEYWORDS")) {
var documentFiles = ProcessedDataFileNames.listDocumentFiles(processedDataPathBase, untilBatch);
int processed = 0;
for (var file : documentFiles) {
task.progress("LOAD", processed++, documentFiles.size());
loadKeywordsFromFile(domainIdRegistry, file);
}
task.progress("LOAD", processed, documentFiles.size());
}
logger.info("Finished");
return true;
}
private void loadKeywordsFromFile(DomainIdRegistry domainIdRegistry, Path file) throws IOException {
try (var stream = DocumentRecordParquetFileReader.streamKeywordsProjection(file)) {
logger.info("Loading keywords from {}", file);
stream.filter(DocumentRecordKeywordsProjection::hasKeywords)
.forEach(proj -> insertKeywords(domainIdRegistry, proj));
}

View File

@ -1,7 +1,5 @@
package nu.marginalia.loading.domains;
import nu.marginalia.model.EdgeDomain;
import java.util.HashMap;
import java.util.Map;
@ -20,15 +18,6 @@ public class DomainIdRegistry {
return id;
}
public int getDomainId(EdgeDomain domainName) {
return getDomainId(domainName.toString());
}
public boolean isKnown(String domainName) {
return domainIds.containsKey(domainName);
}
void add(String domainName, int id) {
domainIds.put(domainName, id);
}

View File

@ -7,6 +7,7 @@ import nu.marginalia.io.processed.DomainLinkRecordParquetFileReader;
import nu.marginalia.io.processed.ProcessedDataFileNames;
import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.model.processed.DomainLinkRecord;
import nu.marginalia.process.control.ProcessHeartbeat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -27,14 +28,29 @@ public class DomainLinksLoaderService {
this.dataSource = dataSource;
}
public void loadLinks(DomainIdRegistry domainIdRegistry, Path processedDataPathBase, int untilBatch) throws IOException, SQLException {
public boolean loadLinks(DomainIdRegistry domainIdRegistry,
ProcessHeartbeat heartbeat,
Path processedDataPathBase,
int untilBatch) throws IOException, SQLException {
dropLinkData();
var linkFiles = ProcessedDataFileNames.listDomainLinkFiles(processedDataPathBase, untilBatch);
for (var file : linkFiles) {
loadLinksFromFile(domainIdRegistry, file);
try (var task = heartbeat.createAdHocTaskHeartbeat("LINKS")) {
var linkFiles = ProcessedDataFileNames.listDomainLinkFiles(processedDataPathBase, untilBatch);
int processed = 0;
for (var file : linkFiles) {
task.progress("LOAD", processed++, linkFiles.size());
loadLinksFromFile(domainIdRegistry, file);
}
task.progress("LOAD", processed, linkFiles.size());
}
logger.info("Finished");
return true;
}
private void dropLinkData() throws SQLException {

View File

@ -7,10 +7,10 @@ import nu.marginalia.io.processed.ProcessedDataFileNames;
import nu.marginalia.loader.DbTestUtil;
import nu.marginalia.model.processed.DomainLinkRecord;
import nu.marginalia.model.processed.DomainRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import nu.marginalia.process.control.ProcessAdHocTaskHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeat;
import org.junit.jupiter.api.*;
import org.mockito.Mockito;
import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
@ -29,6 +29,7 @@ import static org.junit.jupiter.api.Assertions.*;
@Testcontainers
class DomainLoaderServiceTest {
List<Path> toDelete = new ArrayList<>();
ProcessHeartbeat heartbeat;
@Container
static MariaDBContainer<?> mariaDBContainer = new MariaDBContainer<>("mariadb")
@ -38,6 +39,15 @@ class DomainLoaderServiceTest {
.withInitScript("db/migration/V23_06_0_000__base.sql")
.withNetworkAliases("mariadb");
@BeforeEach
public void setUp() {
heartbeat = Mockito.mock(ProcessHeartbeat.class);
Mockito.when(heartbeat.createAdHocTaskHeartbeat(Mockito.anyString())).thenReturn(
Mockito.mock(ProcessAdHocTaskHeartbeat.class)
);
}
@AfterEach
public void tearDown() throws IOException {
for (var path : Lists.reverse(toDelete)) {

View File

@ -8,9 +8,14 @@ import nu.marginalia.loader.DbTestUtil;
import nu.marginalia.loading.domains.DomainLoaderService;
import nu.marginalia.model.processed.DomainLinkRecord;
import nu.marginalia.model.processed.DomainRecord;
import nu.marginalia.process.control.ProcessAdHocTaskHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
@ -27,6 +32,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
@Testcontainers
class DomainLinksLoaderServiceTest {
List<Path> toDelete = new ArrayList<>();
ProcessHeartbeat heartbeat;
@Container
static MariaDBContainer<?> mariaDBContainer = new MariaDBContainer<>("mariadb")
@ -36,6 +42,15 @@ class DomainLinksLoaderServiceTest {
.withInitScript("db/migration/V23_06_0_000__base.sql")
.withNetworkAliases("mariadb");
@BeforeEach
public void setUp() {
heartbeat = Mockito.mock(ProcessHeartbeat.class);
Mockito.when(heartbeat.createAdHocTaskHeartbeat(Mockito.anyString())).thenReturn(
Mockito.mock(ProcessAdHocTaskHeartbeat.class)
);
}
@AfterEach
public void tearDown() throws IOException {
for (var path : Lists.reverse(toDelete)) {
@ -87,7 +102,7 @@ class DomainLinksLoaderServiceTest {
var domainRegistry = domainService.getOrCreateDomainIds(workDir, 2);
var dls = new DomainLinksLoaderService(dataSource);
dls.loadLinks(domainRegistry, workDir, 2);
dls.loadLinks(domainRegistry, heartbeat, workDir, 2);
Map<Integer, Set<Integer>> expected = new HashMap<>();
Map<Integer, Set<Integer>> actual = new HashMap<>();

View File

@ -5,7 +5,7 @@ import nu.marginalia.db.storage.model.FileStorage;
import nu.marginalia.db.storage.model.FileStorageType;
import nu.marginalia.index.journal.reader.IndexJournalReaderSingleFile;
import nu.marginalia.keyword.model.DocumentKeywords;
import nu.marginalia.loading.documents.LoaderIndexJournalWriter;
import nu.marginalia.loading.LoaderIndexJournalWriter;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginallia.index.journal.IndexJournalFileNames;
import org.junit.jupiter.api.AfterEach;