(control, WIP) MQFSM and ProcessService are sitting in a tree

We're spawning processes from the MSFSM in control service now!
This commit is contained in:
Viktor Lofgren 2023-07-11 17:08:43 +02:00
parent 3c7c77fe21
commit 77261a38cd
15 changed files with 296 additions and 18 deletions

View File

@ -9,14 +9,29 @@ version 'SNAPSHOT'
compileJava.options.encoding = "UTF-8" compileJava.options.encoding = "UTF-8"
compileTestJava.options.encoding = "UTF-8" compileTestJava.options.encoding = "UTF-8"
task dist(type: Copy) { tasks.register('dist', Copy) {
from subprojects.collect { it.tasks.withType(Tar) } from subprojects.collect { it.tasks.withType(Tar) }
into "$buildDir/dist" into "$buildDir/dist"
}
doLast {
copy {
from tarTree("$buildDir/dist/converter-process.tar")
into "$projectDir/run/dist/"
}
copy {
from tarTree("$buildDir/dist/crawler-process.tar")
into "$projectDir/run/dist/"
}
copy {
from tarTree("$buildDir/dist/loader-process.tar")
into "$projectDir/run/dist/"
}
}
}
idea { idea {
module { module {
excludeDirs.add(file("$projectDir/run/model")) excludeDirs.add(file("$projectDir/run/model"))
excludeDirs.add(file("$projectDir/run/dist"))
excludeDirs.add(file("$projectDir/run/samples")) excludeDirs.add(file("$projectDir/run/samples"))
excludeDirs.add(file("$projectDir/run/db")) excludeDirs.add(file("$projectDir/run/db"))
excludeDirs.add(file("$projectDir/run/logs")) excludeDirs.add(file("$projectDir/run/logs"))

View File

@ -103,6 +103,19 @@ public class StateMachine {
smOutbox.notify(transition.state(), transition.message()); smOutbox.notify(transition.state(), transition.message());
} }
/** Initialize the state machine. */
public void init(String jsonEncodedArgument) throws Exception {
var transition = StateTransition.to("INITIAL", jsonEncodedArgument);
synchronized (this) {
this.state = allStates.get(transition.state());
notifyAll();
}
smInbox.start();
smOutbox.notify(transition.state(), transition.message());
}
/** Resume the state machine from the last known state. */ /** Resume the state machine from the last known state. */
public void resume() throws Exception { public void resume() throws Exception {

View File

@ -108,9 +108,7 @@ public class LoaderMain {
try { try {
AtomicInteger loadTotal = new AtomicInteger(); AtomicInteger loadTotal = new AtomicInteger();
WorkLog.readLog(logFile, entry -> { WorkLog.readLog(logFile, entry -> loadTotal.incrementAndGet());
loadTotal.incrementAndGet();
});
LoaderMain.loadTotal = loadTotal.get(); LoaderMain.loadTotal = loadTotal.get();
AtomicInteger loaded = new AtomicInteger(); AtomicInteger loaded = new AtomicInteger();

View File

@ -28,7 +28,7 @@ public class LoaderModule extends AbstractModule {
bind(ServiceDescriptors.class).toInstance(SearchServiceDescriptors.descriptors); bind(ServiceDescriptors.class).toInstance(SearchServiceDescriptors.descriptors);
bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("loader", 0, UUID.randomUUID())); bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("loader", 0, UUID.randomUUID()));
bind(Gson.class).toInstance(createGson()); bind(Gson.class).toProvider(this::createGson);
bind(Path.class).annotatedWith(Names.named("local-index-path")).toInstance(Path.of(System.getProperty("local-index-path", "/vol"))); bind(Path.class).annotatedWith(Names.named("local-index-path")).toInstance(Path.of(System.getProperty("local-index-path", "/vol")));
bind(LanguageModels.class).toInstance(WmsaHome.getLanguageModels()); bind(LanguageModels.class).toInstance(WmsaHome.getLanguageModels());

View File

@ -21,6 +21,7 @@ public class ControlMain extends MainClass {
Injector injector = Guice.createInjector( Injector injector = Guice.createInjector(
new DatabaseModule(), new DatabaseModule(),
new ControlProcessModule(),
new ConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Control)); new ConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Control));
injector.getInstance(ControlMain.class); injector.getInstance(ControlMain.class);

View File

@ -0,0 +1,15 @@
package nu.marginalia.control;
import com.google.inject.AbstractModule;
import com.google.inject.Module;
import com.google.inject.name.Names;
import java.nio.file.Path;
public class ControlProcessModule extends AbstractModule {
@Override
protected void configure() {
String dist = System.getProperty("distPath", System.getProperty("WMSA_HOME") + "/dist/current");
bind(Path.class).annotatedWith(Names.named("distPath")).toInstance(Path.of(dist));
}
}

View File

@ -5,10 +5,7 @@ import com.google.inject.Inject;
import nu.marginalia.client.ServiceMonitors; import nu.marginalia.client.ServiceMonitors;
import nu.marginalia.control.model.ControlProcess; import nu.marginalia.control.model.ControlProcess;
import nu.marginalia.control.process.ControlProcesses; import nu.marginalia.control.process.ControlProcesses;
import nu.marginalia.control.svc.EventLogService; import nu.marginalia.control.svc.*;
import nu.marginalia.control.svc.HeartbeatService;
import nu.marginalia.control.svc.MessageQueueMonitorService;
import nu.marginalia.control.svc.MessageQueueViewService;
import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.renderer.MustacheRenderer; import nu.marginalia.renderer.MustacheRenderer;
@ -22,6 +19,7 @@ import spark.Response;
import spark.Spark; import spark.Spark;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -51,7 +49,8 @@ public class ControlService extends Service {
ControlProcesses controlProcesses, ControlProcesses controlProcesses,
StaticResources staticResources, StaticResources staticResources,
MessageQueueViewService messageQueueViewService, MessageQueueViewService messageQueueViewService,
MessageQueueMonitorService messageQueueMonitorService MessageQueueMonitorService messageQueueMonitorService,
ProcessService processService
) throws IOException { ) throws IOException {
super(params); super(params);
@ -84,6 +83,11 @@ public class ControlService extends Service {
controlProcesses.start(ControlProcess.REPARTITION_REINDEX); controlProcesses.start(ControlProcess.REPARTITION_REINDEX);
return "OK"; return "OK";
}); });
// TODO: This should be a POST
Spark.get("/public/reconvert", (req, rsp) -> {
controlProcesses.start(ControlProcess.RECONVERT_LOAD, "/samples/crawl-blogs/plan.yaml");
return "OK";
});
Spark.get("/public/:resource", this::serveStatic); Spark.get("/public/:resource", this::serveStatic);

View File

@ -1,7 +1,8 @@
package nu.marginalia.control.model; package nu.marginalia.control.model;
public enum ControlProcess { public enum ControlProcess {
REPARTITION_REINDEX; REPARTITION_REINDEX,
RECONVERT_LOAD;
public String id() { public String id() {
return "fsm:" + name().toLowerCase(); return "fsm:" + name().toLowerCase();

View File

@ -1,8 +1,10 @@
package nu.marginalia.control.process; package nu.marginalia.control.process;
import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.control.model.ControlProcess; import nu.marginalia.control.model.ControlProcess;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqsm.StateMachine; import nu.marginalia.mqsm.StateMachine;
import nu.marginalia.mqsm.graph.AbstractStateGraph; import nu.marginalia.mqsm.graph.AbstractStateGraph;
@ -17,17 +19,21 @@ import java.util.UUID;
public class ControlProcesses { public class ControlProcesses {
private final MqPersistence persistence; private final MqPersistence persistence;
private final ServiceEventLog eventLog; private final ServiceEventLog eventLog;
private final Gson gson;
public Map<ControlProcess, StateMachine> stateMachines = new HashMap<>(); public Map<ControlProcess, StateMachine> stateMachines = new HashMap<>();
@Inject @Inject
public ControlProcesses(MqPersistence persistence, public ControlProcesses(MqPersistence persistence,
GsonFactory gsonFactory,
BaseServiceParams baseServiceParams, BaseServiceParams baseServiceParams,
RepartitionReindexProcess repartitionReindexProcess RepartitionReindexProcess repartitionReindexProcess,
ReconvertAndLoadProcess reconvertAndLoadProcess
) { ) {
this.persistence = persistence; this.persistence = persistence;
this.eventLog = baseServiceParams.eventLog; this.eventLog = baseServiceParams.eventLog;
this.gson = gsonFactory.get();
register(ControlProcess.REPARTITION_REINDEX, repartitionReindexProcess); register(ControlProcess.REPARTITION_REINDEX, repartitionReindexProcess);
register(ControlProcess.RECONVERT_LOAD, reconvertAndLoadProcess);
} }
private void register(ControlProcess process, AbstractStateGraph graph) { private void register(ControlProcess process, AbstractStateGraph graph) {
@ -48,6 +54,12 @@ public class ControlProcesses {
stateMachines.get(process).init(); stateMachines.get(process).init();
} }
public <T> void start(ControlProcess process, Object arg) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init(gson.toJson(arg));
}
public void resume(ControlProcess process) throws Exception { public void resume(ControlProcess process) throws Exception {
eventLog.logEvent("FSM-RESUME", process.id()); eventLog.logEvent("FSM-RESUME", process.id());
stateMachines.get(process).resume(); stateMachines.get(process).resume();

View File

@ -0,0 +1,77 @@
package nu.marginalia.control.process;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.control.svc.ProcessService;
import nu.marginalia.index.client.IndexClient;
import nu.marginalia.index.client.IndexMqEndpoints;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqsm.StateFactory;
import nu.marginalia.mqsm.graph.AbstractStateGraph;
import nu.marginalia.mqsm.graph.GraphState;
import nu.marginalia.mqsm.graph.ResumeBehavior;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
@Singleton
public class ReconvertAndLoadProcess extends AbstractStateGraph {
// STATES
private static final String INITIAL = "INITIAL";
private static final String RECONVERT = "RECONVERT";
private static final String LOAD = "LOAD";
private static final String MOVE_INDEX_FILES = "MOVE_INDEX_FILES";
private static final String END = "END";
private final ProcessService processService;
@Inject
public ReconvertAndLoadProcess(StateFactory stateFactory, ProcessService processService) {
super(stateFactory);
this.processService = processService;
}
@GraphState(name = INITIAL, next = RECONVERT)
public String init(String crawlJob) throws Exception {
Path path = Path.of(crawlJob);
if (!Files.exists(path)) {
error("Bad crawl job path");
}
Files.deleteIfExists(path.getParent().resolve("process/process.log"));
return path.toString();
}
@GraphState(name = RECONVERT, next = LOAD, resume = ResumeBehavior.RETRY)
public String reconvert(String crawlJob) throws Exception {
if (!processService.trigger(ProcessService.ProcessId.CONVERTER, Path.of(crawlJob)))
error();
return crawlJob;
}
@GraphState(name = LOAD, next = MOVE_INDEX_FILES, resume = ResumeBehavior.RETRY)
public void load(String crawlJob) throws Exception {
if (!processService.trigger(ProcessService.ProcessId.LOADER, Path.of(crawlJob)))
error();
}
@GraphState(name = MOVE_INDEX_FILES, next = END, resume = ResumeBehavior.ERROR)
public String moveIndexFiles(String crawlJob) throws Exception {
Path indexData = Path.of("/vol/index.dat");
Path indexDest = Path.of("/vol/iw/0/page-index.dat");
if (!Files.exists(indexData))
error("Index data not found");
Files.move(indexData, indexDest, StandardCopyOption.REPLACE_EXISTING);
return crawlJob;
}
}

View File

@ -35,7 +35,7 @@ public class HeartbeatService {
rs.getString("SERVICE_NAME"), rs.getString("SERVICE_NAME"),
rs.getString("SERVICE_BASE"), rs.getString("SERVICE_BASE"),
trimUUID(rs.getString("INSTANCE")), trimUUID(rs.getString("INSTANCE")),
rs.getInt("TSDIFF") / 1000., rs.getLong("TSDIFF") / 1000.,
rs.getBoolean("ALIVE") rs.getBoolean("ALIVE")
)); ));
} }
@ -63,7 +63,7 @@ public class HeartbeatService {
rs.getString("PROCESS_NAME"), rs.getString("PROCESS_NAME"),
rs.getString("PROCESS_BASE"), rs.getString("PROCESS_BASE"),
trimUUID(rs.getString("INSTANCE")), trimUUID(rs.getString("INSTANCE")),
rs.getInt("TSDIFF") / 1000., rs.getLong("TSDIFF") / 1000.,
rs.getInt("PROGRESS"), rs.getInt("PROGRESS"),
rs.getString("STATUS") rs.getString("STATUS")
)); ));

View File

@ -0,0 +1,127 @@
package nu.marginalia.control.svc;
import com.google.inject.name.Named;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.utils.IOUtils;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Singleton
public class ProcessService {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final ServiceEventLog eventLog;
private final Path distPath;
private final ConcurrentHashMap<ProcessId, Process> processes = new ConcurrentHashMap<>();
public enum ProcessId {
CRAWLER("crawler-process/bin/crawler-process"),
CONVERTER("converter-process/bin/converter-process"),
LOADER("loader-process/bin/loader-process");
public final String path;
ProcessId(String path) {
this.path = path;
}
};
@Inject
public ProcessService(BaseServiceParams params,
@Named("distPath") Path distPath) {
this.eventLog = params.eventLog;
this.distPath = distPath;
}
public boolean trigger(ProcessId processId, Path plan) throws Exception {
String processPath = processPath(processId);
String[] args = new String[] {
processPath,
plan.toString()
};
String[] env = env(plan);
Process process;
if (!Files.exists(Path.of(processPath))) {
logger.error("Process not found: {}", processPath);
return false;
}
if (!Files.exists(plan)) {
logger.error("Plan not found: {}", processPath);
return false;
}
logger.info("Starting process: {}", processId + ": " + Arrays.toString(args) + " // " + Arrays.toString(env));
synchronized (processes) {
if (processes.containsKey(processId)) return false;
process = Runtime.getRuntime().exec(args, env);
processes.put(processId, process);
}
try (var es = new BufferedReader(new InputStreamReader(process.getErrorStream()));
var os = new BufferedReader(new InputStreamReader(process.getInputStream()))
) {
eventLog.logEvent("PROCESS-STARTED", processId.toString());
process.onExit().whenComplete((p,t) -> eventLog.logEvent("PROCESS-EXIT", processId.toString()));
while (process.isAlive()) {
if (es.ready())
logger.warn("{}:{}", processId, es.readLine());
if (os.ready())
logger.debug("{}:{}", processId, os.readLine());
}
return 0 == process.waitFor();
}
finally {
processes.remove(processId);
}
}
public boolean isRunning(ProcessId processId) {
return processes.containsKey(processId);
}
public boolean kill(ProcessId processId) {
Process process = processes.get(processId);
if (process == null) return false;
eventLog.logEvent("PROCESS-KILL", processId.toString());
process.destroy();
processes.remove(processId);
return true;
}
private String processPath(ProcessId id) {
return distPath.resolve(id.path).toString();
}
private String[] env(Path plan) {
Map<String, String> opts = new HashMap<>();
String WMSA_HOME = System.getenv("WMSA_HOME");
if (WMSA_HOME == null || WMSA_HOME.isBlank()) {
WMSA_HOME = "/var/lib/wmsa";
}
opts.put("WMSA_HOME", WMSA_HOME);
opts.put("JAVA_HOME", System.getenv("JAVA_HOME"));
opts.put("JAVA_OPTS", "-Dcrawl.rootDirRewrite=/crawl:" + plan.getParent().toString());
return opts.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).toArray(String[]::new);
}
}

View File

@ -6,6 +6,8 @@ x-svc: &service
- conf:/wmsa/conf:ro - conf:/wmsa/conf:ro
- model:/wmsa/model - model:/wmsa/model
- data:/wmsa/data - data:/wmsa/data
- dist:/dist
- samples:/samples
- logs:/var/log/wmsa - logs:/var/log/wmsa
networks: networks:
- wmsa - wmsa
@ -143,4 +145,16 @@ volumes:
driver_opts: driver_opts:
type: none type: none
o: bind o: bind
device: run/data device: run/data
dist:
driver: local
driver_opts:
type: none
o: bind
device: run/dist
samples:
driver: local
driver_opts:
type: none
o: bind
device: run/samples

0
run/dist/.gitignore vendored Normal file
View File

3
run/env/service.env vendored
View File

@ -1 +1,2 @@
WMSA_HOME=run/ WMSA_HOME=run/
CONTROL_SERVICE_OPTS="-DdistPath=/dist"