diff --git a/build.gradle b/build.gradle index 155d5b89..49bf7b98 100644 --- a/build.gradle +++ b/build.gradle @@ -9,14 +9,29 @@ version 'SNAPSHOT' compileJava.options.encoding = "UTF-8" compileTestJava.options.encoding = "UTF-8" -task dist(type: Copy) { +tasks.register('dist', Copy) { from subprojects.collect { it.tasks.withType(Tar) } into "$buildDir/dist" -} + doLast { + copy { + from tarTree("$buildDir/dist/converter-process.tar") + into "$projectDir/run/dist/" + } + copy { + from tarTree("$buildDir/dist/crawler-process.tar") + into "$projectDir/run/dist/" + } + copy { + from tarTree("$buildDir/dist/loader-process.tar") + into "$projectDir/run/dist/" + } + } +} idea { module { excludeDirs.add(file("$projectDir/run/model")) + excludeDirs.add(file("$projectDir/run/dist")) excludeDirs.add(file("$projectDir/run/samples")) excludeDirs.add(file("$projectDir/run/db")) excludeDirs.add(file("$projectDir/run/logs")) diff --git a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java index 7e56e6ba..b8ffc739 100644 --- a/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java +++ b/code/common/message-queue/src/main/java/nu/marginalia/mqsm/StateMachine.java @@ -103,6 +103,19 @@ public class StateMachine { smOutbox.notify(transition.state(), transition.message()); } + /** Initialize the state machine. */ + public void init(String jsonEncodedArgument) throws Exception { + var transition = StateTransition.to("INITIAL", jsonEncodedArgument); + + synchronized (this) { + this.state = allStates.get(transition.state()); + notifyAll(); + } + + smInbox.start(); + smOutbox.notify(transition.state(), transition.message()); + } + /** Resume the state machine from the last known state. */ public void resume() throws Exception { diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java index c17193a7..c70573a6 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java @@ -108,9 +108,7 @@ public class LoaderMain { try { AtomicInteger loadTotal = new AtomicInteger(); - WorkLog.readLog(logFile, entry -> { - loadTotal.incrementAndGet(); - }); + WorkLog.readLog(logFile, entry -> loadTotal.incrementAndGet()); LoaderMain.loadTotal = loadTotal.get(); AtomicInteger loaded = new AtomicInteger(); diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderModule.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderModule.java index 09d5be2e..338e722f 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderModule.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderModule.java @@ -28,7 +28,7 @@ public class LoaderModule extends AbstractModule { bind(ServiceDescriptors.class).toInstance(SearchServiceDescriptors.descriptors); bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("loader", 0, UUID.randomUUID())); - bind(Gson.class).toInstance(createGson()); + bind(Gson.class).toProvider(this::createGson); bind(Path.class).annotatedWith(Names.named("local-index-path")).toInstance(Path.of(System.getProperty("local-index-path", "/vol"))); bind(LanguageModels.class).toInstance(WmsaHome.getLanguageModels()); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlMain.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlMain.java index e3d12163..52307353 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlMain.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlMain.java @@ -21,6 +21,7 @@ public class ControlMain extends MainClass { Injector injector = Guice.createInjector( new DatabaseModule(), + new ControlProcessModule(), new ConfigurationModule(SearchServiceDescriptors.descriptors, ServiceId.Control)); injector.getInstance(ControlMain.class); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlProcessModule.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlProcessModule.java new file mode 100644 index 00000000..3530a89b --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlProcessModule.java @@ -0,0 +1,15 @@ +package nu.marginalia.control; + +import com.google.inject.AbstractModule; +import com.google.inject.Module; +import com.google.inject.name.Names; + +import java.nio.file.Path; + +public class ControlProcessModule extends AbstractModule { + @Override + protected void configure() { + String dist = System.getProperty("distPath", System.getProperty("WMSA_HOME") + "/dist/current"); + bind(Path.class).annotatedWith(Names.named("distPath")).toInstance(Path.of(dist)); + } +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java index 39969083..9d660a1e 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/ControlService.java @@ -5,10 +5,7 @@ import com.google.inject.Inject; import nu.marginalia.client.ServiceMonitors; import nu.marginalia.control.model.ControlProcess; import nu.marginalia.control.process.ControlProcesses; -import nu.marginalia.control.svc.EventLogService; -import nu.marginalia.control.svc.HeartbeatService; -import nu.marginalia.control.svc.MessageQueueMonitorService; -import nu.marginalia.control.svc.MessageQueueViewService; +import nu.marginalia.control.svc.*; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.renderer.MustacheRenderer; @@ -22,6 +19,7 @@ import spark.Response; import spark.Spark; import java.io.IOException; +import java.nio.file.Path; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -51,7 +49,8 @@ public class ControlService extends Service { ControlProcesses controlProcesses, StaticResources staticResources, MessageQueueViewService messageQueueViewService, - MessageQueueMonitorService messageQueueMonitorService + MessageQueueMonitorService messageQueueMonitorService, + ProcessService processService ) throws IOException { super(params); @@ -84,6 +83,11 @@ public class ControlService extends Service { controlProcesses.start(ControlProcess.REPARTITION_REINDEX); return "OK"; }); + // TODO: This should be a POST + Spark.get("/public/reconvert", (req, rsp) -> { + controlProcesses.start(ControlProcess.RECONVERT_LOAD, "/samples/crawl-blogs/plan.yaml"); + return "OK"; + }); Spark.get("/public/:resource", this::serveStatic); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcess.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcess.java index 613dd2e5..b7db26db 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcess.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/model/ControlProcess.java @@ -1,7 +1,8 @@ package nu.marginalia.control.model; public enum ControlProcess { - REPARTITION_REINDEX; + REPARTITION_REINDEX, + RECONVERT_LOAD; public String id() { return "fsm:" + name().toLowerCase(); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java index 5813bdbb..6b8a64eb 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ControlProcesses.java @@ -1,8 +1,10 @@ package nu.marginalia.control.process; +import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.control.model.ControlProcess; +import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.mqsm.StateMachine; import nu.marginalia.mqsm.graph.AbstractStateGraph; @@ -17,17 +19,21 @@ import java.util.UUID; public class ControlProcesses { private final MqPersistence persistence; private final ServiceEventLog eventLog; + private final Gson gson; public Map stateMachines = new HashMap<>(); @Inject public ControlProcesses(MqPersistence persistence, + GsonFactory gsonFactory, BaseServiceParams baseServiceParams, - RepartitionReindexProcess repartitionReindexProcess + RepartitionReindexProcess repartitionReindexProcess, + ReconvertAndLoadProcess reconvertAndLoadProcess ) { this.persistence = persistence; this.eventLog = baseServiceParams.eventLog; - + this.gson = gsonFactory.get(); register(ControlProcess.REPARTITION_REINDEX, repartitionReindexProcess); + register(ControlProcess.RECONVERT_LOAD, reconvertAndLoadProcess); } private void register(ControlProcess process, AbstractStateGraph graph) { @@ -48,6 +54,12 @@ public class ControlProcesses { stateMachines.get(process).init(); } + public void start(ControlProcess process, Object arg) throws Exception { + eventLog.logEvent("FSM-START", process.id()); + + stateMachines.get(process).init(gson.toJson(arg)); + } + public void resume(ControlProcess process) throws Exception { eventLog.logEvent("FSM-RESUME", process.id()); stateMachines.get(process).resume(); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ReconvertAndLoadProcess.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ReconvertAndLoadProcess.java new file mode 100644 index 00000000..1b329b97 --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/process/ReconvertAndLoadProcess.java @@ -0,0 +1,77 @@ +package nu.marginalia.control.process; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import nu.marginalia.control.svc.ProcessService; +import nu.marginalia.index.client.IndexClient; +import nu.marginalia.index.client.IndexMqEndpoints; +import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.outbox.MqOutbox; +import nu.marginalia.mqsm.StateFactory; +import nu.marginalia.mqsm.graph.AbstractStateGraph; +import nu.marginalia.mqsm.graph.GraphState; +import nu.marginalia.mqsm.graph.ResumeBehavior; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; + +@Singleton +public class ReconvertAndLoadProcess extends AbstractStateGraph { + + // STATES + + private static final String INITIAL = "INITIAL"; + private static final String RECONVERT = "RECONVERT"; + private static final String LOAD = "LOAD"; + private static final String MOVE_INDEX_FILES = "MOVE_INDEX_FILES"; + private static final String END = "END"; + private final ProcessService processService; + + + @Inject + public ReconvertAndLoadProcess(StateFactory stateFactory, ProcessService processService) { + super(stateFactory); + this.processService = processService; + } + + @GraphState(name = INITIAL, next = RECONVERT) + public String init(String crawlJob) throws Exception { + Path path = Path.of(crawlJob); + + if (!Files.exists(path)) { + error("Bad crawl job path"); + } + + Files.deleteIfExists(path.getParent().resolve("process/process.log")); + + return path.toString(); + } + + @GraphState(name = RECONVERT, next = LOAD, resume = ResumeBehavior.RETRY) + public String reconvert(String crawlJob) throws Exception { + if (!processService.trigger(ProcessService.ProcessId.CONVERTER, Path.of(crawlJob))) + error(); + + return crawlJob; + } + + @GraphState(name = LOAD, next = MOVE_INDEX_FILES, resume = ResumeBehavior.RETRY) + public void load(String crawlJob) throws Exception { + if (!processService.trigger(ProcessService.ProcessId.LOADER, Path.of(crawlJob))) + error(); + } + + @GraphState(name = MOVE_INDEX_FILES, next = END, resume = ResumeBehavior.ERROR) + public String moveIndexFiles(String crawlJob) throws Exception { + Path indexData = Path.of("/vol/index.dat"); + Path indexDest = Path.of("/vol/iw/0/page-index.dat"); + + if (!Files.exists(indexData)) + error("Index data not found"); + + Files.move(indexData, indexDest, StandardCopyOption.REPLACE_EXISTING); + + return crawlJob; + } +} diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/HeartbeatService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/HeartbeatService.java index def90b42..bd7f56c7 100644 --- a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/HeartbeatService.java +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/HeartbeatService.java @@ -35,7 +35,7 @@ public class HeartbeatService { rs.getString("SERVICE_NAME"), rs.getString("SERVICE_BASE"), trimUUID(rs.getString("INSTANCE")), - rs.getInt("TSDIFF") / 1000., + rs.getLong("TSDIFF") / 1000., rs.getBoolean("ALIVE") )); } @@ -63,7 +63,7 @@ public class HeartbeatService { rs.getString("PROCESS_NAME"), rs.getString("PROCESS_BASE"), trimUUID(rs.getString("INSTANCE")), - rs.getInt("TSDIFF") / 1000., + rs.getLong("TSDIFF") / 1000., rs.getInt("PROGRESS"), rs.getString("STATUS") )); diff --git a/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java new file mode 100644 index 00000000..b5198a9e --- /dev/null +++ b/code/services-satellite/control-service/src/main/java/nu/marginalia/control/svc/ProcessService.java @@ -0,0 +1,127 @@ +package nu.marginalia.control.svc; + +import com.google.inject.name.Named; +import nu.marginalia.service.control.ServiceEventLog; +import nu.marginalia.service.server.BaseServiceParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import spark.utils.IOUtils; + +import javax.inject.Inject; +import javax.inject.Singleton; +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +@Singleton +public class ProcessService { + private final Logger logger = LoggerFactory.getLogger(getClass()); + private final ServiceEventLog eventLog; + private final Path distPath; + + private final ConcurrentHashMap processes = new ConcurrentHashMap<>(); + + public enum ProcessId { + CRAWLER("crawler-process/bin/crawler-process"), + CONVERTER("converter-process/bin/converter-process"), + LOADER("loader-process/bin/loader-process"); + + public final String path; + ProcessId(String path) { + this.path = path; + } + }; + + @Inject + public ProcessService(BaseServiceParams params, + @Named("distPath") Path distPath) { + this.eventLog = params.eventLog; + this.distPath = distPath; + } + + public boolean trigger(ProcessId processId, Path plan) throws Exception { + String processPath = processPath(processId); + String[] args = new String[] { + processPath, + plan.toString() + }; + String[] env = env(plan); + + Process process; + + if (!Files.exists(Path.of(processPath))) { + logger.error("Process not found: {}", processPath); + return false; + } + if (!Files.exists(plan)) { + logger.error("Plan not found: {}", processPath); + return false; + } + + logger.info("Starting process: {}", processId + ": " + Arrays.toString(args) + " // " + Arrays.toString(env)); + synchronized (processes) { + if (processes.containsKey(processId)) return false; + process = Runtime.getRuntime().exec(args, env); + processes.put(processId, process); + } + + try (var es = new BufferedReader(new InputStreamReader(process.getErrorStream())); + var os = new BufferedReader(new InputStreamReader(process.getInputStream())) + ) { + eventLog.logEvent("PROCESS-STARTED", processId.toString()); + process.onExit().whenComplete((p,t) -> eventLog.logEvent("PROCESS-EXIT", processId.toString())); + + while (process.isAlive()) { + if (es.ready()) + logger.warn("{}:{}", processId, es.readLine()); + if (os.ready()) + logger.debug("{}:{}", processId, os.readLine()); + } + + return 0 == process.waitFor(); + } + finally { + processes.remove(processId); + } + + + } + + public boolean isRunning(ProcessId processId) { + return processes.containsKey(processId); + } + + public boolean kill(ProcessId processId) { + Process process = processes.get(processId); + if (process == null) return false; + + eventLog.logEvent("PROCESS-KILL", processId.toString()); + process.destroy(); + processes.remove(processId); + + return true; + } + + private String processPath(ProcessId id) { + return distPath.resolve(id.path).toString(); + } + + private String[] env(Path plan) { + + Map opts = new HashMap<>(); + String WMSA_HOME = System.getenv("WMSA_HOME"); + if (WMSA_HOME == null || WMSA_HOME.isBlank()) { + WMSA_HOME = "/var/lib/wmsa"; + } + opts.put("WMSA_HOME", WMSA_HOME); + opts.put("JAVA_HOME", System.getenv("JAVA_HOME")); + opts.put("JAVA_OPTS", "-Dcrawl.rootDirRewrite=/crawl:" + plan.getParent().toString()); + + return opts.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).toArray(String[]::new); + } +} diff --git a/docker-compose.yml b/docker-compose.yml index 8490d5a7..4aff54db 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,6 +6,8 @@ x-svc: &service - conf:/wmsa/conf:ro - model:/wmsa/model - data:/wmsa/data + - dist:/dist + - samples:/samples - logs:/var/log/wmsa networks: - wmsa @@ -143,4 +145,16 @@ volumes: driver_opts: type: none o: bind - device: run/data \ No newline at end of file + device: run/data + dist: + driver: local + driver_opts: + type: none + o: bind + device: run/dist + samples: + driver: local + driver_opts: + type: none + o: bind + device: run/samples \ No newline at end of file diff --git a/run/dist/.gitignore b/run/dist/.gitignore new file mode 100644 index 00000000..e69de29b diff --git a/run/env/service.env b/run/env/service.env index 2fb7f09e..db871699 100644 --- a/run/env/service.env +++ b/run/env/service.env @@ -1 +1,2 @@ -WMSA_HOME=run/ \ No newline at end of file +WMSA_HOME=run/ +CONTROL_SERVICE_OPTS="-DdistPath=/dist" \ No newline at end of file