(minor) Refactor ControlService

This commit is contained in:
Viktor Lofgren 2023-07-11 14:51:51 +02:00
parent 4c016b0318
commit 4ee3f6ba3f
5 changed files with 75 additions and 39 deletions

View File

@ -5,6 +5,10 @@ import com.google.inject.Inject;
import nu.marginalia.client.ServiceMonitors;
import nu.marginalia.control.model.ControlProcess;
import nu.marginalia.control.process.ControlProcesses;
import nu.marginalia.control.svc.EventLogService;
import nu.marginalia.control.svc.HeartbeatService;
import nu.marginalia.control.svc.MessageQueueMonitorService;
import nu.marginalia.control.svc.MessageQueueViewService;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.renderer.MustacheRenderer;
@ -34,7 +38,7 @@ public class ControlService extends Service {
private final MustacheRenderer<Map<?,?>> messageQueueRenderer;
private final MqPersistence messageQueuePersistence;
private final StaticResources staticResources;
private final ServiceEventLog eventLog;
private final MessageQueueMonitorService messageQueueMonitorService;
@Inject
@ -46,12 +50,12 @@ public class ControlService extends Service {
MqPersistence messageQueuePersistence,
ControlProcesses controlProcesses,
StaticResources staticResources,
MessageQueueViewService messageQueueViewService
MessageQueueViewService messageQueueViewService,
MessageQueueMonitorService messageQueueMonitorService
) throws IOException {
super(params);
this.monitors = monitors;
this.eventLog = params.eventLog;
indexRenderer = rendererFactory.renderer("control/index");
servicesRenderer = rendererFactory.renderer("control/services");
@ -61,6 +65,7 @@ public class ControlService extends Service {
this.messageQueuePersistence = messageQueuePersistence;
this.staticResources = staticResources;
this.messageQueueMonitorService = messageQueueMonitorService;
Spark.get("/public/heartbeats", (req, res) -> {
res.type("application/json");
@ -74,6 +79,7 @@ public class ControlService extends Service {
Spark.get("/public/events", (req, rsp) -> eventsRenderer.render(Map.of("events", eventLogService.getLastEntries(20))));
Spark.get("/public/message-queue", (req, rsp) -> messageQueueRenderer.render(Map.of("messages", messageQueueViewService.getLastEntries(20))));
// TODO: This should be a POST
Spark.get("/public/repartition", (req, rsp) -> {
controlProcesses.start(ControlProcess.REPARTITION_REINDEX);
return "OK";
@ -82,10 +88,6 @@ public class ControlService extends Service {
Spark.get("/public/:resource", this::serveStatic);
monitors.subscribe(this::logMonitorStateChange);
Thread reaperThread = new Thread(this::reapMessageQueue, "message-queue-reaper");
reaperThread.setDaemon(true);
reaperThread.start();
}
@ -98,35 +100,6 @@ public class ControlService extends Service {
}
private void reapMessageQueue() {
for (;;) {
try {
TimeUnit.MINUTES.sleep(10);
int outcome = messageQueuePersistence.reapDeadMessages();
if (outcome > 0) {
eventLog.logEvent("MESSAGE-QUEUE-REAPED", Integer.toString(outcome));
logger.info("Reaped {} dead messages from message queue", outcome);
}
outcome = messageQueuePersistence.cleanOldMessages();
if (outcome > 0) {
eventLog.logEvent("MESSAGE-QUEUE-CLEANED", Integer.toString(outcome));
logger.info("Cleaned {} stale messages from message queue", outcome);
}
}
catch (InterruptedException ex) {
logger.info("Message queue reaper interrupted");
return;
}
catch (Exception ex) {
logger.error("Message queue reaper failed", ex);
}
}
}
private void logMonitorStateChange() {
logger.info("Service state change: {}", monitors.getRunningServices());
}

View File

@ -1,4 +1,4 @@
package nu.marginalia.control;
package nu.marginalia.control.svc;
import com.google.inject.Inject;
import com.google.inject.Singleton;

View File

@ -1,4 +1,4 @@
package nu.marginalia.control;
package nu.marginalia.control.svc;
import com.google.inject.Inject;
import com.google.inject.Singleton;

View File

@ -0,0 +1,63 @@
package nu.marginalia.control.svc;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.inject.Inject;
import javax.inject.Singleton;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
@Singleton
public class MessageQueueMonitorService {
private final Logger logger = LoggerFactory.getLogger(MessageQueueMonitorService.class);
private final MqPersistence persistence;
private final ServiceEventLog eventLog;
@Inject
public MessageQueueMonitorService(BaseServiceParams params) {
this.persistence = params.messageQueuePersistence;
this.eventLog = params.eventLog;
Thread reaperThread = new Thread(this::run, "message-queue-reaper");
reaperThread.setDaemon(true);
reaperThread.start();
}
private void run() {
for (;;) {
try {
TimeUnit.MINUTES.sleep(10);
reapMessages();
}
catch (InterruptedException ex) {
logger.info("Message queue reaper interrupted");
break;
}
catch (Exception ex) {
logger.error("Message queue reaper failed", ex);
}
}
}
private void reapMessages() throws SQLException {
int outcome = persistence.reapDeadMessages();
if (outcome > 0) {
eventLog.logEvent("MESSAGE-QUEUE-REAPED", Integer.toString(outcome));
logger.info("Reaped {} dead messages from message queue", outcome);
}
outcome = persistence.cleanOldMessages();
if (outcome > 0) {
eventLog.logEvent("MESSAGE-QUEUE-CLEANED", Integer.toString(outcome));
logger.info("Cleaned {} stale messages from message queue", outcome);
}
}
}

View File

@ -1,4 +1,4 @@
package nu.marginalia.control;
package nu.marginalia.control.svc;
import com.google.inject.Inject;
import com.google.inject.Singleton;