From e162406d408166a267fcbc1ce6da44b52ec4184a Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Mon, 15 Jan 2024 15:44:23 +0100 Subject: [PATCH] (control) New control-side actors for cleaning up stale service heartbeats and message queue entries --- .../marginalia/actor/ActorStateMachine.java | 10 +++ .../mq/persistence/MqPersistence.java | 7 +- .../nu/marginalia/control/ControlService.java | 2 + .../control/actor/ControlActor.java | 1 + .../control/actor/ControlActorService.java | 20 ++++++ .../monitor/MessageQueueMonitorActor.java | 17 ++++- .../monitor/ServiceHeartbeatMonitorActor.java | 67 +++++++++++++++++++ .../control/sys/svc/HeartbeatService.java | 6 +- .../control/svc/HeartbeatServiceTest.java | 2 +- 9 files changed, 121 insertions(+), 11 deletions(-) create mode 100644 code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/ServiceHeartbeatMonitorActor.java diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ActorStateMachine.java b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ActorStateMachine.java index 8aad59cf..8bb2a068 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ActorStateMachine.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/actor/ActorStateMachine.java @@ -335,6 +335,16 @@ public class ActorStateMachine { return isDirectlyInitializable; } + public boolean isRunning() { + if (state == null) + return false; + + if (state.isFinal()) + return false; + + return true; + } + private class StateEventSubscription implements MqSubscription { @Override diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java index 8c316c8e..2f4b1ea0 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java @@ -479,11 +479,12 @@ public class MqPersistence { /** Removes messages that have been set to a terminal state a while after their last update timestamp */ public int cleanOldMessages() throws SQLException { try (var conn = dataSource.getConnection(); + // Keep 72 hours of messages var setToDead = conn.prepareStatement(""" DELETE FROM MESSAGE_QUEUE - WHERE STATE = 'OK' - AND TTL IS NOT NULL - AND TIMESTAMPDIFF(SECOND, UPDATED_TIME, CURRENT_TIMESTAMP(6)) > 3600 + WHERE STATE IN ('OK', 'DEAD') + AND (TTL IS NULL OR TTL = 0) + AND TIMESTAMPDIFF(SECOND, UPDATED_TIME, CURRENT_TIMESTAMP(6)) > 72*3600 """)) { int ret = setToDead.executeUpdate(); if (!conn.getAutoCommit()) diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java index 06cce612..efe28e2f 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/ControlService.java @@ -105,6 +105,8 @@ public class ControlService extends Service { Spark.get("/public/:resource", this::serveStatic); monitors.subscribe(this::logMonitorStateChange); + + controlActorService.startDefaultActors(); } private Object overviewModel(Request request, Response response) { diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java index a9417124..694162cd 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActor.java @@ -3,6 +3,7 @@ package nu.marginalia.control.actor; public enum ControlActor { MONITOR_MESSAGE_QUEUE, + MONITOR_HEARTBEATS, REINDEX_ALL, REPROCESS_ALL, RECRAWL_ALL, diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java index 766cf23c..f993b835 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/ControlActorService.java @@ -8,6 +8,7 @@ import nu.marginalia.actor.ActorStateMachine; import nu.marginalia.actor.prototype.ActorPrototype; import nu.marginalia.actor.state.ActorStateInstance; import nu.marginalia.control.actor.monitor.MessageQueueMonitorActor; +import nu.marginalia.control.actor.monitor.ServiceHeartbeatMonitorActor; import nu.marginalia.control.actor.precession.RecrawlAllActor; import nu.marginalia.control.actor.precession.ReindexAllActor; import nu.marginalia.control.actor.precession.ReprocessAllActor; @@ -15,6 +16,8 @@ import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.server.BaseServiceParams; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; @@ -24,6 +27,8 @@ import java.util.stream.Collectors; @Singleton public class ControlActorService { + private static final Logger logger = LoggerFactory.getLogger(ControlActorService.class); + private final ServiceEventLog eventLog; private final Gson gson; private final MessageQueueFactory messageQueueFactory; @@ -34,6 +39,7 @@ public class ControlActorService { public ControlActorService(MessageQueueFactory messageQueueFactory, BaseServiceParams baseServiceParams, MessageQueueMonitorActor messageQueueMonitor, + ServiceHeartbeatMonitorActor heartbeatMonitorActor, ReindexAllActor reindexAllActor, ReprocessAllActor reprocessAllActor, RecrawlAllActor recrawlAllActor @@ -45,6 +51,7 @@ public class ControlActorService { register(ControlActor.MONITOR_MESSAGE_QUEUE, messageQueueMonitor); + register(ControlActor.MONITOR_HEARTBEATS, heartbeatMonitorActor); register(ControlActor.REINDEX_ALL, reindexAllActor); register(ControlActor.REPROCESS_ALL, reprocessAllActor); register(ControlActor.RECRAWL_ALL, recrawlAllActor); @@ -120,4 +127,17 @@ public class ControlActorService { return actorDefinitions.get(actor); } + public void startDefaultActors() { + try { + if (!stateMachines.get(ControlActor.MONITOR_HEARTBEATS).isRunning()) { + start(ControlActor.MONITOR_HEARTBEATS); + } + if (!stateMachines.get(ControlActor.MONITOR_MESSAGE_QUEUE).isRunning()) { + start(ControlActor.MONITOR_MESSAGE_QUEUE); + } + } + catch (Exception ex) { + logger.error("Failed to start default actors", ex); + } + } } \ No newline at end of file diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/MessageQueueMonitorActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/MessageQueueMonitorActor.java index 3d47d94d..acb1d2f6 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/MessageQueueMonitorActor.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/MessageQueueMonitorActor.java @@ -8,12 +8,15 @@ import nu.marginalia.actor.state.ActorResumeBehavior; import nu.marginalia.actor.state.ActorStep; import nu.marginalia.actor.state.Resume; import nu.marginalia.mq.persistence.MqPersistence; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.sql.SQLException; import java.util.concurrent.TimeUnit; @Singleton public class MessageQueueMonitorActor extends RecordActorPrototype { - + private static final Logger logger = LoggerFactory.getLogger(MessageQueueMonitorActor.class); private final MqPersistence persistence; public record Initial() implements ActorStep {} @@ -26,9 +29,17 @@ public class MessageQueueMonitorActor extends RecordActorPrototype { case Initial i -> new Monitor(); case Monitor m -> { for (;;) { - persistence.reapDeadMessages(); - persistence.cleanOldMessages(); + // Sleep before reaping dead messages, to avoid problems during startup TimeUnit.SECONDS.sleep(60); + + try { + persistence.reapDeadMessages(); + persistence.cleanOldMessages(); + } + catch (SQLException ex) { + logger.warn("Failed to reap dead messages", ex); + } + } } default -> new Error(); diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/ServiceHeartbeatMonitorActor.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/ServiceHeartbeatMonitorActor.java new file mode 100644 index 00000000..11d71a8f --- /dev/null +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/actor/monitor/ServiceHeartbeatMonitorActor.java @@ -0,0 +1,67 @@ +package nu.marginalia.control.actor.monitor; + +import com.google.gson.Gson; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.actor.prototype.RecordActorPrototype; +import nu.marginalia.actor.state.ActorResumeBehavior; +import nu.marginalia.actor.state.ActorStep; +import nu.marginalia.actor.state.Resume; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +@Singleton +public class ServiceHeartbeatMonitorActor extends RecordActorPrototype { + + private static final Logger logger = LoggerFactory.getLogger(ServiceHeartbeatMonitorActor.class); + private final HikariDataSource dataSource; + + public record Initial() implements ActorStep {} + @Resume(behavior=ActorResumeBehavior.RETRY) + public record Monitor() implements ActorStep {} + + @Override + public ActorStep transition(ActorStep self) throws Exception { + return switch (self) { + case Initial i -> new Monitor(); + case Monitor m -> { + for (;;) { + TimeUnit.SECONDS.sleep(10); + pruneDeadServices(); + } + } + default -> new Error(); + }; + } + + private void pruneDeadServices() { + try (var conn = dataSource.getConnection(); + var stmt = conn.createStatement()) { + // Delete heartbeats that haven't been seen in 10 days + stmt.execute(""" + DELETE FROM SERVICE_HEARTBEAT + WHERE TIMESTAMPDIFF(SECOND, HEARTBEAT_TIME, CURRENT_TIMESTAMP(6)) > 10*24*3600 + """); + } + catch (SQLException ex) { + logger.warn("Failed to prune dead services", ex); + } + } + + @Inject + public ServiceHeartbeatMonitorActor(Gson gson, + HikariDataSource dataSource) { + super(gson); + this.dataSource = dataSource; + } + + @Override + public String describe() { + return "Periodically cleans up dead services from the database"; + } + +} diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/HeartbeatService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/HeartbeatService.java index aa008062..6fb9931e 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/HeartbeatService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/HeartbeatService.java @@ -15,13 +15,10 @@ import java.util.List; @Singleton public class HeartbeatService { private final HikariDataSource dataSource; - private final ServiceEventLog eventLogService; @Inject - public HeartbeatService(HikariDataSource dataSource, - ServiceEventLog eventLogService) { + public HeartbeatService(HikariDataSource dataSource) { this.dataSource = dataSource; - this.eventLogService = eventLogService; } public List getServiceHeartbeats() { @@ -113,6 +110,7 @@ public class HeartbeatService { } return heartbeats; } + public void removeTaskHeartbeat(TaskHeartbeat heartbeat) { try (var conn = dataSource.getConnection(); var stmt = conn.prepareStatement(""" diff --git a/code/services-core/control-service/src/test/java/nu/marginalia/control/svc/HeartbeatServiceTest.java b/code/services-core/control-service/src/test/java/nu/marginalia/control/svc/HeartbeatServiceTest.java index ccb0b2ba..0d6c52ab 100644 --- a/code/services-core/control-service/src/test/java/nu/marginalia/control/svc/HeartbeatServiceTest.java +++ b/code/services-core/control-service/src/test/java/nu/marginalia/control/svc/HeartbeatServiceTest.java @@ -56,7 +56,7 @@ class HeartbeatServiceTest { @Test void removeTaskHeartbeat() throws SQLException { - var service = new HeartbeatService(dataSource, Mockito.mock(ServiceEventLog.class)); + var service = new HeartbeatService(dataSource); try (var conn = dataSource.getConnection(); var stmt = conn.createStatement()) {