(*) Refactor MQ and MQSM
This commit is contained in:
parent
e618aa34e9
commit
bca4bbb6c8
30
code/api/process-mqapi/build.gradle
Normal file
30
code/api/process-mqapi/build.gradle
Normal file
@ -0,0 +1,30 @@
|
||||
plugins {
|
||||
id 'java'
|
||||
id "io.freefair.lombok" version "5.3.3.3"
|
||||
|
||||
id 'jvm-test-suite'
|
||||
}
|
||||
|
||||
java {
|
||||
toolchain {
|
||||
languageVersion.set(JavaLanguageVersion.of(17))
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation project(':code:common:db')
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
testImplementation libs.bundles.junit
|
||||
testImplementation libs.mockito
|
||||
}
|
||||
|
||||
test {
|
||||
useJUnitPlatform()
|
||||
}
|
||||
|
||||
task fastTests(type: Test) {
|
||||
useJUnitPlatform {
|
||||
excludeTags "slow"
|
||||
}
|
||||
}
|
@ -1,6 +1,7 @@
|
||||
package nu.marginalia.converting.mqapi;
|
||||
package nu.marginalia.mqapi;
|
||||
|
||||
public class ConverterInboxNames {
|
||||
public class ProcessInboxNames {
|
||||
public static final String CONVERTER_INBOX = "converter";
|
||||
public static final String LOADER_INBOX = "loader";
|
||||
public static final String CRAWLER_INBOX = "crawler";
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package nu.marginalia.converting.mqapi;
|
||||
package nu.marginalia.mqapi.converting;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import nu.marginalia.db.storage.model.FileStorageId;
|
@ -1,8 +1,10 @@
|
||||
package nu.marginalia.crawling.mqapi;
|
||||
package nu.marginalia.mqapi.crawling;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import nu.marginalia.db.storage.model.FileStorageId;
|
||||
|
||||
/** A request to start a crawl */
|
||||
@AllArgsConstructor
|
||||
public class CrawlRequest {
|
||||
FileStorageId specStorage;
|
||||
FileStorageId crawlStorage;
|
@ -1,4 +1,4 @@
|
||||
package nu.marginalia.converting.mqapi;
|
||||
package nu.marginalia.mqapi.loading;
|
||||
|
||||
import lombok.AllArgsConstructor;
|
||||
import nu.marginalia.db.storage.model.FileStorageId;
|
||||
@ -6,5 +6,4 @@ import nu.marginalia.db.storage.model.FileStorageId;
|
||||
@AllArgsConstructor
|
||||
public class LoadRequest {
|
||||
public FileStorageId processedDataStorage;
|
||||
|
||||
}
|
@ -1,4 +1,10 @@
|
||||
# Core Service Clients
|
||||
# Clients
|
||||
|
||||
## Core Services
|
||||
|
||||
* [assistant-api](assistant-api/)
|
||||
* [search-api](search-api/)
|
||||
* [index-api](index-api/)
|
||||
|
||||
These are clients for the [core services](../services-core/), along with what models
|
||||
are necessary for speaking to them. They each implement the abstract client classes from
|
||||
@ -8,3 +14,10 @@ All that is necessary is to `@Inject` them into the constructor and then
|
||||
requests can be sent.
|
||||
|
||||
**Note:** If you are looking for the public API, it's handled by the api service in [services-satellite/api-service](../services-satellite/api-service).
|
||||
|
||||
## MQ-API Process API
|
||||
|
||||
[process-mqapi](process-mqapi/) defines requests and inboxes for the message queue based API used
|
||||
for interacting with processes.
|
||||
|
||||
See [common/message-queue](../common/message-queue) and [services-satellite/control-service](../services-satellite/control-service).
|
@ -1,3 +1,8 @@
|
||||
package nu.marginalia.db.storage.model;
|
||||
|
||||
public record FileStorageBaseId(long id) {}
|
||||
public record FileStorageBaseId(long id) {
|
||||
|
||||
public String toString() {
|
||||
return Long.toString(id);
|
||||
}
|
||||
}
|
||||
|
@ -4,4 +4,8 @@ public record FileStorageId(long id) {
|
||||
public static FileStorageId of(int storageId) {
|
||||
return new FileStorageId(storageId);
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return Long.toString(id);
|
||||
}
|
||||
}
|
||||
|
@ -5,4 +5,79 @@ as well as a finite state machine library backed by the
|
||||
message queue that enables long-running tasks that outlive
|
||||
the execution lifespan of the involved processes.
|
||||
|
||||
![Message States](msgstate.svg)
|
||||
![Message States](msgstate.svg)
|
||||
|
||||
The message queue is interacted with via the Inbox and Outbox classes.
|
||||
|
||||
There are three types of inboxes;
|
||||
|
||||
Name|Description
|
||||
---|---
|
||||
MqSingleShotInbox|A single message is received and then the inbox is closed.
|
||||
MqAsynchronousInbox|Messages are received asynchronously and can be processed in parallel.
|
||||
MqSynchronousInbox|Messages are received synchronously and will be processed in order; message processing can be aborted.
|
||||
|
||||
A single outbox implementation exists, the `MqOutbox`, which implements multiple message sending strategies,
|
||||
including blocking and asynchronous paradigms. Lower level access to the message queue itself is provided by the `MqPersistence` class.
|
||||
|
||||
The inbox implementations as well as the outbox can be constructed via the `MessageQueueFactory` class.
|
||||
|
||||
## Message Queue State Machine (MQSM)
|
||||
|
||||
The MQSM is a finite state machine that is backed by the message queue. The machine itself
|
||||
is defined through a class that extends the 'AbstractStateGraph'; with state transitions and
|
||||
names defined as implementations.
|
||||
|
||||
Example:
|
||||
|
||||
```java
|
||||
class ExampleStateMachine extends AbstractStateGraph {
|
||||
|
||||
@GraphState(name = "INITIAL", next="GREET")
|
||||
public void initial() {
|
||||
return "World"; // passed to the next state
|
||||
}
|
||||
|
||||
@GraphState(name = "GREET", next="COUNT-TO-FIVE")
|
||||
public void greet(String name) {
|
||||
System.out.println("Hello " + name);
|
||||
}
|
||||
|
||||
@GraphState(name = "COUNT-TO-FIVE", next="END")
|
||||
public void countToFive(Integer value) {
|
||||
// value is passed from the previous state, since greet didn't pass a value,
|
||||
// null will be the default.
|
||||
|
||||
if (null == value) {
|
||||
// jumps to the current state with a value of 0
|
||||
transition("COUNT-TO-FIVE", 0);
|
||||
}
|
||||
|
||||
|
||||
System.out.println(++value);
|
||||
if (value < 5) {
|
||||
// Loops the current state until value = 5
|
||||
transition("COUNT-TO-FIVE", value);
|
||||
}
|
||||
|
||||
if (value > 5) {
|
||||
// demonstrates an error condition
|
||||
error("Illegal value");
|
||||
}
|
||||
|
||||
// Default transition is to END
|
||||
}
|
||||
|
||||
@GraphState(name="END")
|
||||
public void end() {
|
||||
System.out.println("Done");
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
Each method should ideally be idempotent, or at least be able to handle being called multiple times.
|
||||
It can not be assumed that the states are invoked within the same process, or even on the same machine,
|
||||
on the same day, etc.
|
||||
|
||||
The usual considerations for writing deterministic Java code are advisable unless unavoidable;
|
||||
all state must be local, don't iterate over hash maps, etc.
|
@ -28,6 +28,11 @@ public class StateFactory {
|
||||
|
||||
@Override
|
||||
public StateTransition next(String message) {
|
||||
|
||||
if (message.equals("")) {
|
||||
return logic.apply(null);
|
||||
}
|
||||
|
||||
return logic.apply(gson.fromJson(message, param));
|
||||
}
|
||||
|
||||
@ -72,6 +77,11 @@ public class StateFactory {
|
||||
}
|
||||
|
||||
public StateTransition transition(String state, Object message) {
|
||||
|
||||
if (null == message) {
|
||||
return StateTransition.to(state);
|
||||
}
|
||||
|
||||
return StateTransition.to(state, gson.toJson(message));
|
||||
}
|
||||
|
||||
|
@ -186,8 +186,16 @@ public class StateMachine {
|
||||
if (resumeState.resumeBehavior().equals(ResumeBehavior.ERROR)) {
|
||||
// The message is acknowledged, but the state does not support resuming
|
||||
smOutbox.notify(expectedMessage.id, "ERROR", "Illegal resumption from ACK'ed state " + message.function());
|
||||
} else {
|
||||
}
|
||||
else if (resumeState.resumeBehavior().equals(ResumeBehavior.RESTART)) {
|
||||
this.state = resumeState;
|
||||
|
||||
// The message is already acknowledged, we flag it as dead and then send an identical message
|
||||
smOutbox.flagAsDead(message.msgId());
|
||||
expectedMessage = ExpectedMessage.responseTo(message);
|
||||
smOutbox.notify(message.msgId(), "INITIAL", "");
|
||||
}
|
||||
else {
|
||||
this.state = resumeState;
|
||||
|
||||
// The message is already acknowledged, we flag it as dead and then send an identical message
|
||||
|
@ -4,5 +4,7 @@ public enum ResumeBehavior {
|
||||
/** Retry the state on resume */
|
||||
RETRY,
|
||||
/** Jump to ERROR on resume if the message has been acknowledged */
|
||||
ERROR
|
||||
ERROR,
|
||||
/** Jump to INITIAL on resume */
|
||||
RESTART
|
||||
}
|
||||
|
@ -0,0 +1,98 @@
|
||||
package nu.marginalia.mqsm;
|
||||
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.mq.MessageQueueFactory;
|
||||
import nu.marginalia.mq.MqTestUtil;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqsm.graph.AbstractStateGraph;
|
||||
import nu.marginalia.mqsm.graph.GraphState;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.parallel.Execution;
|
||||
import org.testcontainers.containers.MariaDBContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
|
||||
|
||||
@Tag("slow")
|
||||
@Testcontainers
|
||||
@Execution(SAME_THREAD)
|
||||
public class StateMachineNullTest {
|
||||
@Container
|
||||
static MariaDBContainer<?> mariaDBContainer = new MariaDBContainer<>("mariadb")
|
||||
.withDatabaseName("WMSA_prod")
|
||||
.withUsername("wmsa")
|
||||
.withPassword("wmsa")
|
||||
.withInitScript("sql/current/12-message-queue.sql")
|
||||
.withNetworkAliases("mariadb");
|
||||
|
||||
static HikariDataSource dataSource;
|
||||
static MqPersistence persistence;
|
||||
static MessageQueueFactory messageQueueFactory;
|
||||
private String inboxId;
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
inboxId = UUID.randomUUID().toString();
|
||||
}
|
||||
@BeforeAll
|
||||
public static void setUpAll() {
|
||||
HikariConfig config = new HikariConfig();
|
||||
config.setJdbcUrl(mariaDBContainer.getJdbcUrl());
|
||||
config.setUsername("wmsa");
|
||||
config.setPassword("wmsa");
|
||||
|
||||
dataSource = new HikariDataSource(config);
|
||||
persistence = new MqPersistence(dataSource);
|
||||
messageQueueFactory = new MessageQueueFactory(persistence);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void tearDownAll() {
|
||||
dataSource.close();
|
||||
}
|
||||
|
||||
public static class TestGraph extends AbstractStateGraph {
|
||||
public TestGraph(StateFactory stateFactory) {
|
||||
super(stateFactory);
|
||||
}
|
||||
|
||||
@GraphState(name = "INITIAL", next = "GREET")
|
||||
public void initial() {}
|
||||
|
||||
@GraphState(name = "GREET", next = "END")
|
||||
public void greet(String message) {
|
||||
if (null == message) {
|
||||
System.out.println("Hello, null!");
|
||||
return;
|
||||
}
|
||||
Assertions.fail("Should not be called");
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStateGraphNullSerialization() throws Exception {
|
||||
var stateFactory = new StateFactory(new GsonBuilder().create());
|
||||
var graph = new TestGraph(stateFactory);
|
||||
|
||||
|
||||
var sm = new StateMachine(messageQueueFactory, inboxId, UUID.randomUUID(), graph);
|
||||
sm.registerStates(graph);
|
||||
|
||||
sm.init();
|
||||
|
||||
sm.join(2, TimeUnit.SECONDS);
|
||||
sm.stop();
|
||||
|
||||
MqTestUtil.getMessages(dataSource, inboxId).forEach(System.out::println);
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -26,7 +26,9 @@ dependencies {
|
||||
|
||||
implementation project(':third-party:porterstemmer')
|
||||
implementation project(':third-party:count-min-sketch')
|
||||
|
||||
implementation project(':code:api:index-api')
|
||||
implementation project(':code:api:process-mqapi')
|
||||
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:db')
|
||||
|
@ -29,7 +29,7 @@ import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import static nu.marginalia.converting.mqapi.ConverterInboxNames.CONVERTER_INBOX;
|
||||
import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX;
|
||||
|
||||
public class ConverterMain {
|
||||
|
||||
@ -176,10 +176,10 @@ public class ConverterMain {
|
||||
|
||||
var inbox = messageQueueFactory.createSingleShotInbox(CONVERTER_INBOX, UUID.randomUUID());
|
||||
|
||||
var msgOpt = getMessage(inbox, nu.marginalia.converting.mqapi.ConvertRequest.class.getSimpleName());
|
||||
var msgOpt = getMessage(inbox, nu.marginalia.mqapi.converting.ConvertRequest.class.getSimpleName());
|
||||
var msg = msgOpt.orElseThrow(() -> new RuntimeException("No message received"));
|
||||
|
||||
var request = gson.fromJson(msg.payload(), nu.marginalia.converting.mqapi.ConvertRequest.class);
|
||||
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class);
|
||||
|
||||
var crawlData = fileStorageService.getStorage(request.crawlStorage);
|
||||
var processData = fileStorageService.getStorage(request.processedDataStorage);
|
||||
|
@ -2,26 +2,13 @@ package nu.marginalia.converting;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Provides;
|
||||
import com.google.inject.Singleton;
|
||||
import com.google.inject.name.Names;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.LanguageModels;
|
||||
import nu.marginalia.ProcessConfiguration;
|
||||
import nu.marginalia.WmsaHome;
|
||||
import nu.marginalia.converting.mqapi.ConvertRequest;
|
||||
import nu.marginalia.db.storage.FileStorageService;
|
||||
import nu.marginalia.mq.MessageQueueFactory;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import plan.CrawlPlan;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
import plan.CrawlPlanLoader;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.sql.SQLException;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ConverterModule extends AbstractModule {
|
||||
|
||||
|
@ -19,7 +19,7 @@ tasks.distZip.enabled = false
|
||||
|
||||
dependencies {
|
||||
implementation project(':code:common:process')
|
||||
|
||||
implementation project(':code:api:process-mqapi')
|
||||
implementation project(':code:api:index-api')
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:db')
|
||||
|
@ -7,7 +7,6 @@ import com.google.inject.Injector;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.db.storage.FileStorageService;
|
||||
import nu.marginalia.db.storage.model.FileStorage;
|
||||
import nu.marginalia.mq.MessageQueueFactory;
|
||||
import nu.marginalia.mq.MqMessage;
|
||||
import nu.marginalia.mq.inbox.MqInboxResponse;
|
||||
@ -30,7 +29,7 @@ import java.util.UUID;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static nu.marginalia.converting.mqapi.ConverterInboxNames.LOADER_INBOX;
|
||||
import static nu.marginalia.mqapi.ProcessInboxNames.LOADER_INBOX;
|
||||
|
||||
public class LoaderMain {
|
||||
private static final Logger logger = LoggerFactory.getLogger(LoaderMain.class);
|
||||
@ -215,16 +214,16 @@ public class LoaderMain {
|
||||
|
||||
var inbox = messageQueueFactory.createSingleShotInbox(LOADER_INBOX, UUID.randomUUID());
|
||||
|
||||
var msgOpt = getMessage(inbox, nu.marginalia.converting.mqapi.LoadRequest.class.getSimpleName());
|
||||
var msgOpt = getMessage(inbox, nu.marginalia.mqapi.loading.LoadRequest.class.getSimpleName());
|
||||
if (msgOpt.isEmpty())
|
||||
throw new RuntimeException("No instruction received in inbox");
|
||||
var msg = msgOpt.get();
|
||||
|
||||
if (!nu.marginalia.converting.mqapi.LoadRequest.class.getSimpleName().equals(msg.function())) {
|
||||
if (!nu.marginalia.mqapi.loading.LoadRequest.class.getSimpleName().equals(msg.function())) {
|
||||
throw new RuntimeException("Unexpected message in inbox: " + msg);
|
||||
}
|
||||
|
||||
var request = gson.fromJson(msg.payload(), nu.marginalia.converting.mqapi.LoadRequest.class);
|
||||
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.loading.LoadRequest.class);
|
||||
|
||||
var processData = fileStorageService.getStorage(request.processedDataStorage);
|
||||
|
||||
|
@ -22,6 +22,8 @@ tasks.distZip.enabled = false
|
||||
apply from: "$rootProject.projectDir/docker-service.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation libs.bundles.gson
|
||||
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:service')
|
||||
@ -30,10 +32,9 @@ dependencies {
|
||||
implementation project(':code:common:message-queue')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:common:service-client')
|
||||
implementation project(':code:process-models:converting-model')
|
||||
implementation project(':code:process-models:crawling-model')
|
||||
implementation project(':code:api:search-api')
|
||||
implementation project(':code:api:index-api')
|
||||
implementation project(':code:api:process-mqapi')
|
||||
|
||||
|
||||
implementation libs.lombok
|
||||
@ -43,11 +44,11 @@ dependencies {
|
||||
implementation libs.prometheus
|
||||
implementation libs.notnull
|
||||
implementation libs.guice
|
||||
|
||||
implementation libs.trove
|
||||
implementation libs.spark
|
||||
implementation libs.fastutil
|
||||
implementation libs.commons.io
|
||||
implementation libs.bundles.gson
|
||||
implementation libs.bundles.mariadb
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
|
@ -77,38 +77,31 @@ public class ControlService extends Service {
|
||||
(req, rsp) -> Map.of("storage", controlFileStorageService.getStorageList()),
|
||||
(map) -> storageRenderer.render((Map<?, ?>) map));
|
||||
|
||||
final HtmlRedirect redirectToServices = new HtmlRedirect("/services");
|
||||
final HtmlRedirect redirectToProcesses = new HtmlRedirect("/processes");
|
||||
final HtmlRedirect redirectToStorage = new HtmlRedirect("/storage");
|
||||
|
||||
Spark.post("/public/fsms/:fsm/start", (req, rsp) -> {
|
||||
controlFSMs.start(ControlProcess.valueOf(req.params("fsm").toUpperCase()));
|
||||
return """
|
||||
<?doctype html>
|
||||
<html><head><meta http-equiv="refresh" content="0;URL='/processes'" /></head></html>
|
||||
""";
|
||||
});
|
||||
return "";
|
||||
}, redirectToProcesses);
|
||||
|
||||
Spark.post("/public/fsms/:fsm/stop", (req, rsp) -> {
|
||||
controlFSMs.stop(ControlProcess.valueOf(req.params("fsm").toUpperCase()));
|
||||
return """
|
||||
<?doctype html>
|
||||
<html><head><meta http-equiv="refresh" content="0;URL='/processes'" /></head></html>
|
||||
""";
|
||||
});
|
||||
return "";
|
||||
}, redirectToProcesses);
|
||||
|
||||
// TODO: This should be a POST
|
||||
Spark.get("/public/repartition", (req, rsp) -> {
|
||||
controlFSMs.start(ControlProcess.REPARTITION_REINDEX);
|
||||
return """
|
||||
<?doctype html>
|
||||
<html><head><meta http-equiv="refresh" content="0;URL='/processes'" /></head></html>
|
||||
""";
|
||||
});
|
||||
return "";
|
||||
} , redirectToProcesses);
|
||||
|
||||
// TODO: This should be a POST
|
||||
Spark.get("/public/reconvert/:fid", (req, rsp) -> {
|
||||
Spark.post("/public/storage/:fid/process", (req, rsp) -> {
|
||||
controlFSMs.start(ControlProcess.RECONVERT_LOAD, FileStorageId.of(Integer.parseInt(req.params("fid"))));
|
||||
return """
|
||||
<?doctype html>
|
||||
<html><head><meta http-equiv="refresh" content="0;URL='/processes'" /></head></html>
|
||||
""";
|
||||
});
|
||||
return "";
|
||||
}, redirectToProcesses);
|
||||
Spark.post("/public/storage/:fid/delete", controlFileStorageService::flagFileForDeletionRequest, redirectToStorage);
|
||||
|
||||
Spark.get("/public/:resource", this::serveStatic);
|
||||
|
||||
|
@ -0,0 +1,19 @@
|
||||
package nu.marginalia.control;
|
||||
|
||||
import spark.ResponseTransformer;
|
||||
|
||||
public class HtmlRedirect implements ResponseTransformer {
|
||||
private final String html;
|
||||
|
||||
public HtmlRedirect(String destination) {
|
||||
this.html = """
|
||||
<?doctype html>
|
||||
<html><head><meta http-equiv="refresh" content="0;URL='%s'" /></head></html>
|
||||
""".formatted(destination);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String render(Object any) throws Exception {
|
||||
return html;
|
||||
}
|
||||
}
|
@ -3,7 +3,7 @@ package nu.marginalia.control.fsm.monitor;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.control.svc.ProcessService;
|
||||
import nu.marginalia.converting.mqapi.ConverterInboxNames;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqsm.StateFactory;
|
||||
|
||||
@ -15,7 +15,7 @@ public class ConverterMonitorFSM extends AbstractProcessSpawnerFSM {
|
||||
public ConverterMonitorFSM(StateFactory stateFactory,
|
||||
MqPersistence persistence,
|
||||
ProcessService processService) {
|
||||
super(stateFactory, persistence, processService, ConverterInboxNames.CONVERTER_INBOX, ProcessService.ProcessId.CONVERTER);
|
||||
super(stateFactory, persistence, processService, ProcessInboxNames.CONVERTER_INBOX, ProcessService.ProcessId.CONVERTER);
|
||||
}
|
||||
|
||||
|
||||
|
@ -3,7 +3,7 @@ package nu.marginalia.control.fsm.monitor;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.control.svc.ProcessService;
|
||||
import nu.marginalia.converting.mqapi.ConverterInboxNames;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.mqsm.StateFactory;
|
||||
|
||||
@ -17,7 +17,7 @@ public class LoaderMonitorFSM extends AbstractProcessSpawnerFSM {
|
||||
ProcessService processService) {
|
||||
|
||||
super(stateFactory, persistence, processService,
|
||||
ConverterInboxNames.LOADER_INBOX,
|
||||
ProcessInboxNames.LOADER_INBOX,
|
||||
ProcessService.ProcessId.LOADER);
|
||||
}
|
||||
|
||||
|
@ -8,8 +8,8 @@ import lombok.NoArgsConstructor;
|
||||
import lombok.With;
|
||||
import nu.marginalia.control.svc.ProcessOutboxFactory;
|
||||
import nu.marginalia.control.svc.ProcessService;
|
||||
import nu.marginalia.converting.mqapi.ConvertRequest;
|
||||
import nu.marginalia.converting.mqapi.LoadRequest;
|
||||
import nu.marginalia.mqapi.converting.ConvertRequest;
|
||||
import nu.marginalia.mqapi.loading.LoadRequest;
|
||||
import nu.marginalia.db.storage.FileStorageService;
|
||||
import nu.marginalia.db.storage.model.FileStorageBaseType;
|
||||
import nu.marginalia.db.storage.model.FileStorageId;
|
||||
|
@ -1,9 +1,10 @@
|
||||
package nu.marginalia.control.model;
|
||||
|
||||
import nu.marginalia.db.storage.model.FileStorage;
|
||||
import nu.marginalia.db.storage.model.FileStorageBase;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public record FileStorageBaseWithStorage(FileStorageBase base, List<FileStorage> storage) {
|
||||
public record FileStorageBaseWithStorage(FileStorageBase base,
|
||||
List<FileStorageWithActions> storage)
|
||||
{
|
||||
}
|
||||
|
@ -0,0 +1,16 @@
|
||||
package nu.marginalia.control.model;
|
||||
|
||||
import nu.marginalia.db.storage.model.FileStorage;
|
||||
import nu.marginalia.db.storage.model.FileStorageType;
|
||||
|
||||
public record FileStorageWithActions(FileStorage storage) {
|
||||
public boolean isLoadable() {
|
||||
return storage.type() == FileStorageType.PROCESSED_DATA;
|
||||
}
|
||||
public boolean isConvertible() {
|
||||
return storage.type() == FileStorageType.CRAWL_DATA;
|
||||
}
|
||||
public boolean isDeletable() {
|
||||
return storage.type() == FileStorageType.PROCESSED_DATA;
|
||||
}
|
||||
}
|
@ -5,6 +5,7 @@ import com.google.inject.Singleton;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.control.model.FileStorageBaseWithStorage;
|
||||
import nu.marginalia.control.model.FileStorageWithActions;
|
||||
import nu.marginalia.control.model.ProcessHeartbeat;
|
||||
import nu.marginalia.control.model.ServiceHeartbeat;
|
||||
import nu.marginalia.db.storage.FileStorageService;
|
||||
@ -12,6 +13,8 @@ import nu.marginalia.db.storage.model.FileStorage;
|
||||
import nu.marginalia.db.storage.model.FileStorageBase;
|
||||
import nu.marginalia.db.storage.model.FileStorageBaseId;
|
||||
import nu.marginalia.db.storage.model.FileStorageId;
|
||||
import spark.Request;
|
||||
import spark.Response;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
@ -30,10 +33,24 @@ public class ControlFileStorageService {
|
||||
this.fileStorageService = fileStorageService;
|
||||
}
|
||||
|
||||
public Object flagFileForDeletionRequest(Request request, Response response) throws SQLException {
|
||||
FileStorageId fid = new FileStorageId(Long.parseLong(request.params(":fid")));
|
||||
flagFileForDeletion(fid);
|
||||
return "";
|
||||
}
|
||||
|
||||
public void flagFileForDeletion(FileStorageId id) throws SQLException {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var flagStmt = conn.prepareStatement("UPDATE FILE_STORAGE SET DO_PURGE = TRUE WHERE ID = ?")) {
|
||||
flagStmt.setLong(1, id.id());
|
||||
flagStmt.executeUpdate();
|
||||
}
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
public List<FileStorageBaseWithStorage> getStorageList() {
|
||||
Map<FileStorageBaseId, FileStorageBase> fileStorageBaseByBaseId = new HashMap<>();
|
||||
Map<FileStorageBaseId, List<FileStorage>> fileStoragByBaseId = new HashMap<>();
|
||||
Map<FileStorageBaseId, List<FileStorageWithActions>> fileStoragByBaseId = new HashMap<>();
|
||||
|
||||
List<FileStorageId> storageIds = new ArrayList<>();
|
||||
|
||||
@ -48,12 +65,15 @@ public class ControlFileStorageService {
|
||||
for (var id : storageIds) {
|
||||
var storage = fileStorageService.getStorage(id);
|
||||
fileStorageBaseByBaseId.computeIfAbsent(storage.base().id(), k -> storage.base());
|
||||
fileStoragByBaseId.computeIfAbsent(storage.base().id(), k -> new ArrayList<>()).add(storage);
|
||||
fileStoragByBaseId.computeIfAbsent(storage.base().id(), k -> new ArrayList<>()).add(new FileStorageWithActions(storage));
|
||||
}
|
||||
|
||||
List<FileStorageBaseWithStorage> result = new ArrayList<>();
|
||||
for (var baseId : fileStorageBaseByBaseId.keySet()) {
|
||||
result.add(new FileStorageBaseWithStorage(fileStorageBaseByBaseId.get(baseId), fileStoragByBaseId.get(baseId)));
|
||||
result.add(new FileStorageBaseWithStorage(fileStorageBaseByBaseId.get(baseId),
|
||||
fileStoragByBaseId.get(baseId)
|
||||
|
||||
));
|
||||
}
|
||||
|
||||
return result;
|
||||
|
@ -2,7 +2,7 @@ package nu.marginalia.control.svc;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.converting.mqapi.ConverterInboxNames;
|
||||
import nu.marginalia.mqapi.ProcessInboxNames;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.service.server.BaseServiceParams;
|
||||
@ -19,9 +19,9 @@ public class ProcessOutboxFactory {
|
||||
}
|
||||
|
||||
public MqOutbox createConverterOutbox() {
|
||||
return new MqOutbox(persistence, ConverterInboxNames.CONVERTER_INBOX, params.configuration.serviceName(), params.configuration.instanceUuid());
|
||||
return new MqOutbox(persistence, ProcessInboxNames.CONVERTER_INBOX, params.configuration.serviceName(), params.configuration.instanceUuid());
|
||||
}
|
||||
public MqOutbox createLoaderOutbox() {
|
||||
return new MqOutbox(persistence, ConverterInboxNames.LOADER_INBOX, params.configuration.serviceName(), params.configuration.instanceUuid());
|
||||
return new MqOutbox(persistence, ProcessInboxNames.LOADER_INBOX, params.configuration.serviceName(), params.configuration.instanceUuid());
|
||||
}
|
||||
}
|
||||
|
@ -33,10 +33,26 @@
|
||||
</tr>
|
||||
{{#each storage}}
|
||||
<tr>
|
||||
<td></td>
|
||||
<td>{{type}}</td>
|
||||
<td colspan="2">{{path}}</td>
|
||||
<td>{{description}}</td>
|
||||
<td>
|
||||
{{#if isLoadable}}
|
||||
<form method="post" action="/storage/{{storage.id}}/load">
|
||||
<button type="submit">Load</button>
|
||||
</form>
|
||||
{{/if}}
|
||||
{{#if isConvertible}}
|
||||
<form method="post" action="/storage/{{storage.id}}/process">
|
||||
<button type="submit">Process</button>
|
||||
</form>
|
||||
{{/if}}
|
||||
{{#if isDeletable}}
|
||||
<form method="post" action="/storage/{{storage.id}}/delete" onsubmit="return confirm('Confirm deletion of {{storage.path}}')">
|
||||
<button type="submit">Delete</button>
|
||||
</form>
|
||||
{{/if}}
|
||||
</td>
|
||||
<td>{{storage.type}}</td>
|
||||
<td colspan="2">{{storage.path}}</td>
|
||||
<td>{{storage.description}}</td>
|
||||
</tr>
|
||||
{{/each}}
|
||||
{{/each}}
|
||||
|
@ -44,6 +44,7 @@ include 'code:features-index:domain-ranking'
|
||||
include 'code:api:search-api'
|
||||
include 'code:api:index-api'
|
||||
include 'code:api:assistant-api'
|
||||
include 'code:api:process-mqapi'
|
||||
|
||||
include 'code:common:service-discovery'
|
||||
include 'code:common:service-client'
|
||||
|
Loading…
Reference in New Issue
Block a user