From 6271d5d544f480bb07b5b9255958a76d76937417 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Thu, 18 Jan 2024 15:08:27 +0100 Subject: [PATCH] (mq) Add relation tracking between MQ messages for easier tracking and debugging. The change adds a new column to the MESSAGE_QUEUE table called AUDIT_RELATED_ID. This field is populated transparently, using a dictionary mapping Thread IDs to Message IDs, populated by the inbox handlers. The existing RELATED_ID field has too many semantics associated with them, among other things the FSM code uses them this field in tracking state changes. The change set also improves the consistency of inbox names. The IndexClient was buggy and populated its outbox with a UUID. This is fixed. All Service2Service outboxes are now prefixed with 'pp:' to make them even easier to differentiate. --- .../marginalia/index/client/IndexClient.java | 3 +- .../db/migration/V24_01_0_003__mqaudit.sql | 1 + .../mq/inbox/MqAsynchronousInbox.java | 8 +++++ .../mq/inbox/MqSingleShotInbox.java | 7 +++- .../mq/inbox/MqSynchronousInbox.java | 7 ++++ .../nu/marginalia/mq/outbox/MqOutbox.java | 1 + .../persistence/MqMessageHandlerRegistry.java | 35 +++++++++++++++++++ .../mq/persistence/MqPersistence.java | 13 +++---- .../control/sys/model/MessageQueueEntry.java | 4 +++ .../sys/svc/ControlSysActionsService.java | 2 +- .../control/sys/svc/MessageQueueService.java | 21 ++++++----- .../control/partials/message-queue-table.hdb | 8 ++--- .../templates/control/sys/view-message.hdb | 22 +++++++----- .../monitor/AbstractProcessSpawnerActor.java | 4 +++ 14 files changed, 105 insertions(+), 31 deletions(-) create mode 100644 code/common/db/src/main/resources/db/migration/V24_01_0_003__mqaudit.sql create mode 100644 code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqMessageHandlerRegistry.java diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java index 58447beb..8b747a85 100644 --- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java +++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java @@ -20,6 +20,7 @@ import nu.marginalia.service.id.ServiceId; import java.util.List; import javax.annotation.CheckReturnValue; +import java.util.ServiceConfigurationError; import java.util.UUID; @Singleton @@ -39,7 +40,7 @@ public class IndexClient extends AbstractDynamicClient { this.messageQueueFactory = messageQueueFactory; String inboxName = ServiceId.Index.name; - String outboxName = System.getProperty("service-name:"+nodeId, UUID.randomUUID().toString()); + String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString()); outbox = messageQueueFactory.createOutbox(inboxName, nodeId, outboxName, nodeId, UUID.randomUUID()); setTimeout(30); } diff --git a/code/common/db/src/main/resources/db/migration/V24_01_0_003__mqaudit.sql b/code/common/db/src/main/resources/db/migration/V24_01_0_003__mqaudit.sql new file mode 100644 index 00000000..62bc939c --- /dev/null +++ b/code/common/db/src/main/resources/db/migration/V24_01_0_003__mqaudit.sql @@ -0,0 +1 @@ +ALTER TABLE MESSAGE_QUEUE ADD COLUMN AUDIT_RELATED_ID LONG NOT NULL DEFAULT -1 COMMENT 'To be applied to any new messages created while handling a message'; \ No newline at end of file diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java index 7411b6e2..049f2bb2 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java @@ -2,6 +2,7 @@ package nu.marginalia.mq.inbox; import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.persistence.MqMessageHandlerRegistry; import nu.marginalia.mq.persistence.MqPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,7 +115,9 @@ public class MqAsynchronousInbox implements MqInboxIf { for (var eventSubscriber : eventSubscribers) { if (eventSubscriber.filter(msg)) { + handleMessageWithSubscriber(eventSubscriber, msg); + handled = true; break; } @@ -136,16 +139,21 @@ public class MqAsynchronousInbox implements MqInboxIf { private void respondToMessage(MqSubscription subscriber, MqMessage msg) { try { + MqMessageHandlerRegistry.register(msg.msgId()); final var rsp = subscriber.onRequest(msg); + if (msg.expectsResponse()) { sendResponse(msg, rsp.state(), rsp.message()); } else { registerResponse(msg, rsp.state()); } + } catch (Exception ex) { logger.error("Message Queue subscriber threw exception", ex); registerResponse(msg, MqMessageState.ERR); + } finally { + MqMessageHandlerRegistry.deregister(); } } diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java index 19645c64..8b761383 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java @@ -3,6 +3,7 @@ package nu.marginalia.mq.inbox; import lombok.SneakyThrows; import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.persistence.MqMessageHandlerRegistry; import nu.marginalia.mq.persistence.MqPersistence; import java.sql.SQLException; @@ -45,7 +46,11 @@ public class MqSingleShotInbox { var messages = persistence.pollInbox(inboxName, instanceUUID, i, 1); - if (messages.size() > 0) { + if (!messages.isEmpty()) { + for (var message : messages) { + MqMessageHandlerRegistry.register(message.msgId()); + } + return Optional.of(messages.iterator().next()); } diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java index 9fa0ef4d..99a81c1e 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java @@ -2,6 +2,7 @@ package nu.marginalia.mq.inbox; import nu.marginalia.mq.MqMessage; import nu.marginalia.mq.MqMessageState; +import nu.marginalia.mq.persistence.MqMessageHandlerRegistry; import nu.marginalia.mq.persistence.MqPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -81,7 +82,10 @@ public class MqSynchronousInbox implements MqInboxIf { private void handleMessageWithSubscriber(MqSubscription subscriber, MqMessage msg) { try { + MqMessageHandlerRegistry.register(msg.msgId()); + final var rsp = subscriber.onRequest(msg); + if (msg.expectsResponse()) { sendResponse(msg, rsp.state(), rsp.message()); } @@ -92,6 +96,9 @@ public class MqSynchronousInbox implements MqInboxIf { logger.error("Message Queue subscriber threw exception", ex); registerResponse(msg, MqMessageState.ERR); } + finally { + MqMessageHandlerRegistry.deregister(); + } } private void registerResponse(MqMessage msg, MqMessageState state) { diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java index f144e300..b1f0a44b 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java @@ -122,6 +122,7 @@ public class MqOutbox { gson.toJson(request), null); } + /** Blocks until a response arrives for the given message id (possibly forever) */ public MqMessage waitResponse(long id) throws Exception { synchronized (pendingResponses) { diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqMessageHandlerRegistry.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqMessageHandlerRegistry.java new file mode 100644 index 00000000..b3acc394 --- /dev/null +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqMessageHandlerRegistry.java @@ -0,0 +1,35 @@ +package nu.marginalia.mq.persistence; + +import java.util.concurrent.ConcurrentHashMap; + +/** Keeps track of which thread is handling a message, to be able to + * paint outgoing messages with a AUDIT_RELATED_ID to relate the + * outgoing message to the incoming message that triggered it. + *

+ * This is a pure audit field, a weaker version of the RELATED_ID, + * which is used by e.g. state machines to relate a series of messages to each other. + *

+ * The class is thread-safe, and tracks the thread ID of the thread that + * is currently handling a message. It can be cleaned up by calling + * deregister() when the message has been handled. + */ +public class MqMessageHandlerRegistry { + // There is some small risk of a memory leak here, if the registry entries aren't cleaned up properly, + // but due to the low volume of messages being sent, this is not a big concern. Since the average + // message rate is less than 1 per second, even if the process ran for 60 years, and we leaked every ID + // put in, the total amount of memory leaked would only be about of order 2 MB. + + private static final ConcurrentHashMap handlerRegistry = new ConcurrentHashMap<>(); + + public static void register(long msgId) { + handlerRegistry.put(Thread.currentThread().threadId(), msgId); + } + + public static long getOriginMessage() { + return handlerRegistry.getOrDefault(Thread.currentThread().threadId(), -1L); + } + + public static void deregister() { + handlerRegistry.remove(Thread.currentThread().threadId()); + } +} diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java index f32213b4..02740232 100644 --- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java +++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java @@ -55,8 +55,8 @@ public class MqPersistence { ) throws Exception { try (var conn = dataSource.getConnection(); var stmt = conn.prepareStatement(""" - INSERT INTO MESSAGE_QUEUE(RECIPIENT_INBOX, SENDER_INBOX, RELATED_ID, FUNCTION, PAYLOAD, TTL) - VALUES(?, ?, ?, ?, ?, ?) + INSERT INTO MESSAGE_QUEUE(RECIPIENT_INBOX, SENDER_INBOX, RELATED_ID, AUDIT_RELATED_ID, FUNCTION, PAYLOAD, TTL) + VALUES(?, ?, ?, ?, ?, ?, ?) """); var lastIdQuery = conn.prepareStatement("SELECT LAST_INSERT_ID()")) { @@ -67,11 +67,12 @@ public class MqPersistence { // Translate null to -1, as 0 is a valid id stmt.setLong(3, Objects.requireNonNullElse(relatedMessageId, -1L)); + stmt.setLong(4, MqMessageHandlerRegistry.getOriginMessage()); - stmt.setString(4, function); - stmt.setString(5, payload); - if (ttl == null) stmt.setNull(6, java.sql.Types.BIGINT); - else stmt.setLong(6, ttl.toSeconds()); + stmt.setString(5, function); + stmt.setString(6, payload); + if (ttl == null) stmt.setNull(7, java.sql.Types.BIGINT); + else stmt.setLong(7, ttl.toSeconds()); stmt.executeUpdate(); diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/model/MessageQueueEntry.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/model/MessageQueueEntry.java index b63a614c..61a842f3 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/model/MessageQueueEntry.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/model/MessageQueueEntry.java @@ -5,6 +5,7 @@ import java.time.LocalDate; public record MessageQueueEntry ( long id, long relatedId, + long auditRelatedId, String senderInbox, String recipientInbox, String function, @@ -20,6 +21,9 @@ public record MessageQueueEntry ( public boolean hasRelatedMessage() { return relatedId > 0; } + public boolean hasAuditRelation() { + return auditRelatedId > 0; + } public String stateCode() { if (state == null) { diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java index 27060739..cfef6577 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java @@ -57,7 +57,7 @@ public class ControlSysActionsService { */ private MqOutbox createApiOutbox(MessageQueueFactory mqFactory) { String inboxName = ServiceId.Api.name + ":" + "0"; - String outboxName = System.getProperty("service-name", UUID.randomUUID().toString()); + String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString()); return mqFactory.createOutbox(inboxName, 0, outboxName, 0, UUID.randomUUID()); } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/MessageQueueService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/MessageQueueService.java index 7e74ed94..922d4a20 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/MessageQueueService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/MessageQueueService.java @@ -180,7 +180,7 @@ public class MessageQueueService { public List getLastEntries(int n) { try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE ORDER BY ID DESC LIMIT ? @@ -202,7 +202,7 @@ public class MessageQueueService { public MessageQueueEntry getMessage(long id) { try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE WHERE ID=? """)) { @@ -223,7 +223,7 @@ public class MessageQueueService { public Object getLastEntriesForInbox(String inbox, int n) { try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE WHERE RECIPIENT_INBOX=? ORDER BY ID DESC @@ -247,7 +247,7 @@ public class MessageQueueService { public List getEntriesForInbox(String inbox, long afterId, int n) { try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE WHERE ID < ? AND (RECIPIENT_INBOX = ? OR SENDER_INBOX = ?) ORDER BY ID DESC @@ -274,7 +274,7 @@ public class MessageQueueService { public List getEntriesForInstance(String instance, long afterId, int n) { try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE WHERE ID < ? AND OWNER_INSTANCE = ? ORDER BY ID DESC @@ -300,7 +300,7 @@ public class MessageQueueService { public List getEntries(long afterId, int n) { try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE WHERE ID < ? ORDER BY ID DESC @@ -335,9 +335,10 @@ public class MessageQueueService { // and only available within the operator user interface. try (var conn = dataSource.getConnection(); var ps = conn.prepareStatement(""" - SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL + SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL FROM MESSAGE_QUEUE - WHERE ID = ? OR RELATED_ID = ? + WHERE (ID = ? OR RELATED_ID = ? OR AUDIT_RELATED_ID = ?) + AND STATE != 'DEAD' AND FUNCTION != 'MONITOR' ORDER BY ID DESC """)) { @@ -349,6 +350,7 @@ public class MessageQueueService { ps.setLong(1, nextId); ps.setLong(2, nextId); + ps.setLong(3, nextId); var rs = ps.executeQuery(); while (rs.next()) { @@ -361,6 +363,8 @@ public class MessageQueueService { newRelatedIds.add(entry.id()); if (entry.hasRelatedMessage() && !queriedIds.contains(entry.relatedId())) newRelatedIds.add(entry.relatedId()); + if (entry.hasAuditRelation() && !queriedIds.contains(entry.auditRelatedId())) + newRelatedIds.add(entry.auditRelatedId()); } } } @@ -376,6 +380,7 @@ public class MessageQueueService { return new MessageQueueEntry( rs.getLong("ID"), rs.getLong("RELATED_ID"), + rs.getLong("AUDIT_RELATED_ID"), rs.getString("SENDER_INBOX"), rs.getString("RECIPIENT_INBOX"), rs.getString("FUNCTION"), diff --git a/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb b/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb index a31bc1ce..2959fa49 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb @@ -50,11 +50,9 @@ {{ttl}} - {{#if hasRelatedMessage}} - {{relatedId}} - {{else}} - {{relatedId}} - {{/if}} + {{#if hasRelatedMessage}}{{relatedId}}{{else}}{{relatedId}}{{/if}} + / + {{#if hasAuditRelation}}{{auditRelatedId}}{{else}}{{auditRelatedId}}{{/if}} {{senderInbox}} {{payload}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/sys/view-message.hdb b/code/services-core/control-service/src/main/resources/templates/control/sys/view-message.hdb index 69686f05..0cda4d5c 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/sys/view-message.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/sys/view-message.hdb @@ -22,11 +22,10 @@ state{{state}}[Edit State] senderInbox{{senderInbox}}{{#if senderInbox}}[Reply]{{/if}} relatedId - {{#if hasRelatedMessage}} - {{relatedId}} - {{else}} - {{relatedId}} - {{/if}} + {{#if hasRelatedMessage}} {{relatedId}} {{else}} {{relatedId}} {{/if}} + + auditRelatedId + {{#if hasAuditRelation}}{{auditRelatedId}}{{else}}{{auditRelatedId}}{{/if}} function{{function}} payload @@ -42,20 +41,25 @@

Related Messages

- + - + + + {{#each relatedMessages}} - + - + {{/each}}
IDID/RelatedID Recipient InboxSender Inbox Function State
Payload
{{id}} + {{id}} + {{#if hasRelatedMessage}}/ {{relatedId}}{{/if}} + {{recipientInbox}}{{senderInbox}} {{function}} {{state}}
{{payload}}
{{/if}} diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/monitor/AbstractProcessSpawnerActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/monitor/AbstractProcessSpawnerActor.java index a05700fe..cb5ad1bd 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/monitor/AbstractProcessSpawnerActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/monitor/AbstractProcessSpawnerActor.java @@ -5,6 +5,7 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.*; +import nu.marginalia.mq.persistence.MqMessageHandlerRegistry; import nu.marginalia.process.ProcessService; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.persistence.MqPersistence; @@ -59,6 +60,9 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype { } // else continue } else { + // Special: Associate this thread with the message so that we can get tracking + MqMessageHandlerRegistry.register(messages.getFirst().msgId()); + yield new Run(0); } }