diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java
index 58447beb..8b747a85 100644
--- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java
+++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java
@@ -20,6 +20,7 @@ import nu.marginalia.service.id.ServiceId;
import java.util.List;
import javax.annotation.CheckReturnValue;
+import java.util.ServiceConfigurationError;
import java.util.UUID;
@Singleton
@@ -39,7 +40,7 @@ public class IndexClient extends AbstractDynamicClient {
this.messageQueueFactory = messageQueueFactory;
String inboxName = ServiceId.Index.name;
- String outboxName = System.getProperty("service-name:"+nodeId, UUID.randomUUID().toString());
+ String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString());
outbox = messageQueueFactory.createOutbox(inboxName, nodeId, outboxName, nodeId, UUID.randomUUID());
setTimeout(30);
}
diff --git a/code/common/db/src/main/resources/db/migration/V24_01_0_003__mqaudit.sql b/code/common/db/src/main/resources/db/migration/V24_01_0_003__mqaudit.sql
new file mode 100644
index 00000000..62bc939c
--- /dev/null
+++ b/code/common/db/src/main/resources/db/migration/V24_01_0_003__mqaudit.sql
@@ -0,0 +1 @@
+ALTER TABLE MESSAGE_QUEUE ADD COLUMN AUDIT_RELATED_ID LONG NOT NULL DEFAULT -1 COMMENT 'To be applied to any new messages created while handling a message';
\ No newline at end of file
diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java
index 7411b6e2..049f2bb2 100644
--- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java
+++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqAsynchronousInbox.java
@@ -2,6 +2,7 @@ package nu.marginalia.mq.inbox;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
+import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,7 +115,9 @@ public class MqAsynchronousInbox implements MqInboxIf {
for (var eventSubscriber : eventSubscribers) {
if (eventSubscriber.filter(msg)) {
+
handleMessageWithSubscriber(eventSubscriber, msg);
+
handled = true;
break;
}
@@ -136,16 +139,21 @@ public class MqAsynchronousInbox implements MqInboxIf {
private void respondToMessage(MqSubscription subscriber, MqMessage msg) {
try {
+ MqMessageHandlerRegistry.register(msg.msgId());
final var rsp = subscriber.onRequest(msg);
+
if (msg.expectsResponse()) {
sendResponse(msg, rsp.state(), rsp.message());
}
else {
registerResponse(msg, rsp.state());
}
+
} catch (Exception ex) {
logger.error("Message Queue subscriber threw exception", ex);
registerResponse(msg, MqMessageState.ERR);
+ } finally {
+ MqMessageHandlerRegistry.deregister();
}
}
diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java
index 19645c64..8b761383 100644
--- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java
+++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSingleShotInbox.java
@@ -3,6 +3,7 @@ package nu.marginalia.mq.inbox;
import lombok.SneakyThrows;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
+import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence;
import java.sql.SQLException;
@@ -45,7 +46,11 @@ public class MqSingleShotInbox {
var messages = persistence.pollInbox(inboxName, instanceUUID, i, 1);
- if (messages.size() > 0) {
+ if (!messages.isEmpty()) {
+ for (var message : messages) {
+ MqMessageHandlerRegistry.register(message.msgId());
+ }
+
return Optional.of(messages.iterator().next());
}
diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java
index 9fa0ef4d..99a81c1e 100644
--- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java
+++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/inbox/MqSynchronousInbox.java
@@ -2,6 +2,7 @@ package nu.marginalia.mq.inbox;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
+import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -81,7 +82,10 @@ public class MqSynchronousInbox implements MqInboxIf {
private void handleMessageWithSubscriber(MqSubscription subscriber, MqMessage msg) {
try {
+ MqMessageHandlerRegistry.register(msg.msgId());
+
final var rsp = subscriber.onRequest(msg);
+
if (msg.expectsResponse()) {
sendResponse(msg, rsp.state(), rsp.message());
}
@@ -92,6 +96,9 @@ public class MqSynchronousInbox implements MqInboxIf {
logger.error("Message Queue subscriber threw exception", ex);
registerResponse(msg, MqMessageState.ERR);
}
+ finally {
+ MqMessageHandlerRegistry.deregister();
+ }
}
private void registerResponse(MqMessage msg, MqMessageState state) {
diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java
index f144e300..b1f0a44b 100644
--- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java
+++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/outbox/MqOutbox.java
@@ -122,6 +122,7 @@ public class MqOutbox {
gson.toJson(request),
null);
}
+
/** Blocks until a response arrives for the given message id (possibly forever) */
public MqMessage waitResponse(long id) throws Exception {
synchronized (pendingResponses) {
diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqMessageHandlerRegistry.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqMessageHandlerRegistry.java
new file mode 100644
index 00000000..b3acc394
--- /dev/null
+++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqMessageHandlerRegistry.java
@@ -0,0 +1,35 @@
+package nu.marginalia.mq.persistence;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Keeps track of which thread is handling a message, to be able to
+ * paint outgoing messages with a AUDIT_RELATED_ID to relate the
+ * outgoing message to the incoming message that triggered it.
+ *
+ * This is a pure audit field, a weaker version of the RELATED_ID,
+ * which is used by e.g. state machines to relate a series of messages to each other.
+ *
+ * The class is thread-safe, and tracks the thread ID of the thread that
+ * is currently handling a message. It can be cleaned up by calling
+ * deregister() when the message has been handled.
+ */
+public class MqMessageHandlerRegistry {
+ // There is some small risk of a memory leak here, if the registry entries aren't cleaned up properly,
+ // but due to the low volume of messages being sent, this is not a big concern. Since the average
+ // message rate is less than 1 per second, even if the process ran for 60 years, and we leaked every ID
+ // put in, the total amount of memory leaked would only be about of order 2 MB.
+
+ private static final ConcurrentHashMap handlerRegistry = new ConcurrentHashMap<>();
+
+ public static void register(long msgId) {
+ handlerRegistry.put(Thread.currentThread().threadId(), msgId);
+ }
+
+ public static long getOriginMessage() {
+ return handlerRegistry.getOrDefault(Thread.currentThread().threadId(), -1L);
+ }
+
+ public static void deregister() {
+ handlerRegistry.remove(Thread.currentThread().threadId());
+ }
+}
diff --git a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java
index f32213b4..02740232 100644
--- a/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java
+++ b/code/libraries/message-queue/src/main/java/nu/marginalia/mq/persistence/MqPersistence.java
@@ -55,8 +55,8 @@ public class MqPersistence {
) throws Exception {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
- INSERT INTO MESSAGE_QUEUE(RECIPIENT_INBOX, SENDER_INBOX, RELATED_ID, FUNCTION, PAYLOAD, TTL)
- VALUES(?, ?, ?, ?, ?, ?)
+ INSERT INTO MESSAGE_QUEUE(RECIPIENT_INBOX, SENDER_INBOX, RELATED_ID, AUDIT_RELATED_ID, FUNCTION, PAYLOAD, TTL)
+ VALUES(?, ?, ?, ?, ?, ?, ?)
""");
var lastIdQuery = conn.prepareStatement("SELECT LAST_INSERT_ID()")) {
@@ -67,11 +67,12 @@ public class MqPersistence {
// Translate null to -1, as 0 is a valid id
stmt.setLong(3, Objects.requireNonNullElse(relatedMessageId, -1L));
+ stmt.setLong(4, MqMessageHandlerRegistry.getOriginMessage());
- stmt.setString(4, function);
- stmt.setString(5, payload);
- if (ttl == null) stmt.setNull(6, java.sql.Types.BIGINT);
- else stmt.setLong(6, ttl.toSeconds());
+ stmt.setString(5, function);
+ stmt.setString(6, payload);
+ if (ttl == null) stmt.setNull(7, java.sql.Types.BIGINT);
+ else stmt.setLong(7, ttl.toSeconds());
stmt.executeUpdate();
diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/model/MessageQueueEntry.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/model/MessageQueueEntry.java
index b63a614c..61a842f3 100644
--- a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/model/MessageQueueEntry.java
+++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/model/MessageQueueEntry.java
@@ -5,6 +5,7 @@ import java.time.LocalDate;
public record MessageQueueEntry (
long id,
long relatedId,
+ long auditRelatedId,
String senderInbox,
String recipientInbox,
String function,
@@ -20,6 +21,9 @@ public record MessageQueueEntry (
public boolean hasRelatedMessage() {
return relatedId > 0;
}
+ public boolean hasAuditRelation() {
+ return auditRelatedId > 0;
+ }
public String stateCode() {
if (state == null) {
diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java
index 27060739..cfef6577 100644
--- a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java
+++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java
@@ -57,7 +57,7 @@ public class ControlSysActionsService {
*/
private MqOutbox createApiOutbox(MessageQueueFactory mqFactory) {
String inboxName = ServiceId.Api.name + ":" + "0";
- String outboxName = System.getProperty("service-name", UUID.randomUUID().toString());
+ String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString());
return mqFactory.createOutbox(inboxName, 0, outboxName, 0, UUID.randomUUID());
}
diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/MessageQueueService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/MessageQueueService.java
index 7e74ed94..922d4a20 100644
--- a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/MessageQueueService.java
+++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/MessageQueueService.java
@@ -180,7 +180,7 @@ public class MessageQueueService {
public List getLastEntries(int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
- SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
+ SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
ORDER BY ID DESC
LIMIT ?
@@ -202,7 +202,7 @@ public class MessageQueueService {
public MessageQueueEntry getMessage(long id) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
- SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
+ SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID=?
""")) {
@@ -223,7 +223,7 @@ public class MessageQueueService {
public Object getLastEntriesForInbox(String inbox, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
- SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
+ SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE RECIPIENT_INBOX=?
ORDER BY ID DESC
@@ -247,7 +247,7 @@ public class MessageQueueService {
public List getEntriesForInbox(String inbox, long afterId, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
- SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
+ SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID < ? AND (RECIPIENT_INBOX = ? OR SENDER_INBOX = ?)
ORDER BY ID DESC
@@ -274,7 +274,7 @@ public class MessageQueueService {
public List getEntriesForInstance(String instance, long afterId, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
- SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
+ SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID < ? AND OWNER_INSTANCE = ?
ORDER BY ID DESC
@@ -300,7 +300,7 @@ public class MessageQueueService {
public List getEntries(long afterId, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
- SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
+ SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID < ?
ORDER BY ID DESC
@@ -335,9 +335,10 @@ public class MessageQueueService {
// and only available within the operator user interface.
try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement("""
- SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
+ SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
- WHERE ID = ? OR RELATED_ID = ?
+ WHERE (ID = ? OR RELATED_ID = ? OR AUDIT_RELATED_ID = ?)
+ AND STATE != 'DEAD' AND FUNCTION != 'MONITOR'
ORDER BY ID DESC
""")) {
@@ -349,6 +350,7 @@ public class MessageQueueService {
ps.setLong(1, nextId);
ps.setLong(2, nextId);
+ ps.setLong(3, nextId);
var rs = ps.executeQuery();
while (rs.next()) {
@@ -361,6 +363,8 @@ public class MessageQueueService {
newRelatedIds.add(entry.id());
if (entry.hasRelatedMessage() && !queriedIds.contains(entry.relatedId()))
newRelatedIds.add(entry.relatedId());
+ if (entry.hasAuditRelation() && !queriedIds.contains(entry.auditRelatedId()))
+ newRelatedIds.add(entry.auditRelatedId());
}
}
}
@@ -376,6 +380,7 @@ public class MessageQueueService {
return new MessageQueueEntry(
rs.getLong("ID"),
rs.getLong("RELATED_ID"),
+ rs.getLong("AUDIT_RELATED_ID"),
rs.getString("SENDER_INBOX"),
rs.getString("RECIPIENT_INBOX"),
rs.getString("FUNCTION"),
diff --git a/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb b/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb
index a31bc1ce..2959fa49 100644
--- a/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb
+++ b/code/services-core/control-service/src/main/resources/templates/control/partials/message-queue-table.hdb
@@ -50,11 +50,9 @@
{{ttl}} |
- {{#if hasRelatedMessage}}
- {{relatedId}}
- {{else}}
- {{relatedId}}
- {{/if}}
+ {{#if hasRelatedMessage}}{{relatedId}}{{else}}{{relatedId}}{{/if}}
+ /
+ {{#if hasAuditRelation}}{{auditRelatedId}}{{else}}{{auditRelatedId}}{{/if}}
|
{{senderInbox}} |
{{payload}} |
diff --git a/code/services-core/control-service/src/main/resources/templates/control/sys/view-message.hdb b/code/services-core/control-service/src/main/resources/templates/control/sys/view-message.hdb
index 69686f05..0cda4d5c 100644
--- a/code/services-core/control-service/src/main/resources/templates/control/sys/view-message.hdb
+++ b/code/services-core/control-service/src/main/resources/templates/control/sys/view-message.hdb
@@ -22,11 +22,10 @@
state | {{state}} | [Edit State] |
senderInbox | {{senderInbox}} | {{#if senderInbox}}[Reply]{{/if}} |
relatedId |
- {{#if hasRelatedMessage}}
- {{relatedId}}
- {{else}}
- {{relatedId}}
- {{/if}}
+ {{#if hasRelatedMessage}} {{relatedId}} {{else}} {{relatedId}} {{/if}}
+ | |
+ auditRelatedId |
+ {{#if hasAuditRelation}}{{auditRelatedId}}{{else}}{{auditRelatedId}}{{/if}}
| |
function | {{function}} | |
payload |
@@ -42,20 +41,25 @@
Related Messages
{{/if}}
diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/monitor/AbstractProcessSpawnerActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/monitor/AbstractProcessSpawnerActor.java
index a05700fe..cb5ad1bd 100644
--- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/monitor/AbstractProcessSpawnerActor.java
+++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/monitor/AbstractProcessSpawnerActor.java
@@ -5,6 +5,7 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.*;
+import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
@@ -59,6 +60,9 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
}
// else continue
} else {
+ // Special: Associate this thread with the message so that we can get tracking
+ MqMessageHandlerRegistry.register(messages.getFirst().msgId());
+
yield new Run(0);
}
}
|