(control) New control-side actors for cleaning up stale service heartbeats and message queue entries

This commit is contained in:
Viktor Lofgren 2024-01-15 15:44:23 +01:00
parent c41e68aaab
commit e162406d40
9 changed files with 121 additions and 11 deletions

View File

@ -335,6 +335,16 @@ public class ActorStateMachine {
return isDirectlyInitializable;
}
public boolean isRunning() {
if (state == null)
return false;
if (state.isFinal())
return false;
return true;
}
private class StateEventSubscription implements MqSubscription {
@Override

View File

@ -479,11 +479,12 @@ public class MqPersistence {
/** Removes messages that have been set to a terminal state a while after their last update timestamp */
public int cleanOldMessages() throws SQLException {
try (var conn = dataSource.getConnection();
// Keep 72 hours of messages
var setToDead = conn.prepareStatement("""
DELETE FROM MESSAGE_QUEUE
WHERE STATE = 'OK'
AND TTL IS NOT NULL
AND TIMESTAMPDIFF(SECOND, UPDATED_TIME, CURRENT_TIMESTAMP(6)) > 3600
WHERE STATE IN ('OK', 'DEAD')
AND (TTL IS NULL OR TTL = 0)
AND TIMESTAMPDIFF(SECOND, UPDATED_TIME, CURRENT_TIMESTAMP(6)) > 72*3600
""")) {
int ret = setToDead.executeUpdate();
if (!conn.getAutoCommit())

View File

@ -105,6 +105,8 @@ public class ControlService extends Service {
Spark.get("/public/:resource", this::serveStatic);
monitors.subscribe(this::logMonitorStateChange);
controlActorService.startDefaultActors();
}
private Object overviewModel(Request request, Response response) {

View File

@ -3,6 +3,7 @@ package nu.marginalia.control.actor;
public enum ControlActor {
MONITOR_MESSAGE_QUEUE,
MONITOR_HEARTBEATS,
REINDEX_ALL,
REPROCESS_ALL,
RECRAWL_ALL,

View File

@ -8,6 +8,7 @@ import nu.marginalia.actor.ActorStateMachine;
import nu.marginalia.actor.prototype.ActorPrototype;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.control.actor.monitor.MessageQueueMonitorActor;
import nu.marginalia.control.actor.monitor.ServiceHeartbeatMonitorActor;
import nu.marginalia.control.actor.precession.RecrawlAllActor;
import nu.marginalia.control.actor.precession.ReindexAllActor;
import nu.marginalia.control.actor.precession.ReprocessAllActor;
@ -15,6 +16,8 @@ import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.BaseServiceParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
@ -24,6 +27,8 @@ import java.util.stream.Collectors;
@Singleton
public class ControlActorService {
private static final Logger logger = LoggerFactory.getLogger(ControlActorService.class);
private final ServiceEventLog eventLog;
private final Gson gson;
private final MessageQueueFactory messageQueueFactory;
@ -34,6 +39,7 @@ public class ControlActorService {
public ControlActorService(MessageQueueFactory messageQueueFactory,
BaseServiceParams baseServiceParams,
MessageQueueMonitorActor messageQueueMonitor,
ServiceHeartbeatMonitorActor heartbeatMonitorActor,
ReindexAllActor reindexAllActor,
ReprocessAllActor reprocessAllActor,
RecrawlAllActor recrawlAllActor
@ -45,6 +51,7 @@ public class ControlActorService {
register(ControlActor.MONITOR_MESSAGE_QUEUE, messageQueueMonitor);
register(ControlActor.MONITOR_HEARTBEATS, heartbeatMonitorActor);
register(ControlActor.REINDEX_ALL, reindexAllActor);
register(ControlActor.REPROCESS_ALL, reprocessAllActor);
register(ControlActor.RECRAWL_ALL, recrawlAllActor);
@ -120,4 +127,17 @@ public class ControlActorService {
return actorDefinitions.get(actor);
}
public void startDefaultActors() {
try {
if (!stateMachines.get(ControlActor.MONITOR_HEARTBEATS).isRunning()) {
start(ControlActor.MONITOR_HEARTBEATS);
}
if (!stateMachines.get(ControlActor.MONITOR_MESSAGE_QUEUE).isRunning()) {
start(ControlActor.MONITOR_MESSAGE_QUEUE);
}
}
catch (Exception ex) {
logger.error("Failed to start default actors", ex);
}
}
}

View File

@ -8,12 +8,15 @@ import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.mq.persistence.MqPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
@Singleton
public class MessageQueueMonitorActor extends RecordActorPrototype {
private static final Logger logger = LoggerFactory.getLogger(MessageQueueMonitorActor.class);
private final MqPersistence persistence;
public record Initial() implements ActorStep {}
@ -26,9 +29,17 @@ public class MessageQueueMonitorActor extends RecordActorPrototype {
case Initial i -> new Monitor();
case Monitor m -> {
for (;;) {
// Sleep before reaping dead messages, to avoid problems during startup
TimeUnit.SECONDS.sleep(60);
try {
persistence.reapDeadMessages();
persistence.cleanOldMessages();
TimeUnit.SECONDS.sleep(60);
}
catch (SQLException ex) {
logger.warn("Failed to reap dead messages", ex);
}
}
}
default -> new Error();

View File

@ -0,0 +1,67 @@
package nu.marginalia.control.actor.monitor;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
@Singleton
public class ServiceHeartbeatMonitorActor extends RecordActorPrototype {
private static final Logger logger = LoggerFactory.getLogger(ServiceHeartbeatMonitorActor.class);
private final HikariDataSource dataSource;
public record Initial() implements ActorStep {}
@Resume(behavior=ActorResumeBehavior.RETRY)
public record Monitor() implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial i -> new Monitor();
case Monitor m -> {
for (;;) {
TimeUnit.SECONDS.sleep(10);
pruneDeadServices();
}
}
default -> new Error();
};
}
private void pruneDeadServices() {
try (var conn = dataSource.getConnection();
var stmt = conn.createStatement()) {
// Delete heartbeats that haven't been seen in 10 days
stmt.execute("""
DELETE FROM SERVICE_HEARTBEAT
WHERE TIMESTAMPDIFF(SECOND, HEARTBEAT_TIME, CURRENT_TIMESTAMP(6)) > 10*24*3600
""");
}
catch (SQLException ex) {
logger.warn("Failed to prune dead services", ex);
}
}
@Inject
public ServiceHeartbeatMonitorActor(Gson gson,
HikariDataSource dataSource) {
super(gson);
this.dataSource = dataSource;
}
@Override
public String describe() {
return "Periodically cleans up dead services from the database";
}
}

View File

@ -15,13 +15,10 @@ import java.util.List;
@Singleton
public class HeartbeatService {
private final HikariDataSource dataSource;
private final ServiceEventLog eventLogService;
@Inject
public HeartbeatService(HikariDataSource dataSource,
ServiceEventLog eventLogService) {
public HeartbeatService(HikariDataSource dataSource) {
this.dataSource = dataSource;
this.eventLogService = eventLogService;
}
public List<ServiceHeartbeat> getServiceHeartbeats() {
@ -113,6 +110,7 @@ public class HeartbeatService {
}
return heartbeats;
}
public void removeTaskHeartbeat(TaskHeartbeat heartbeat) {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""

View File

@ -56,7 +56,7 @@ class HeartbeatServiceTest {
@Test
void removeTaskHeartbeat() throws SQLException {
var service = new HeartbeatService(dataSource, Mockito.mock(ServiceEventLog.class));
var service = new HeartbeatService(dataSource);
try (var conn = dataSource.getConnection();
var stmt = conn.createStatement()) {