(mq) Add relation tracking between MQ messages for easier tracking and debugging.

The change adds a new column to the MESSAGE_QUEUE table called AUDIT_RELATED_ID.  This field is populated transparently, using a dictionary mapping Thread IDs to Message IDs, populated by the inbox handlers.

The existing RELATED_ID field has too many semantics associated with them,
among other things the FSM code uses them this field in tracking state changes.

The change set also improves the consistency of inbox names.  The IndexClient was buggy and populated its outbox with a UUID.  This is fixed. All Service2Service outboxes are now prefixed with 'pp:' to make them even easier to differentiate.
This commit is contained in:
Viktor Lofgren 2024-01-18 15:08:27 +01:00
parent 175bd310f5
commit 6271d5d544
14 changed files with 105 additions and 31 deletions

View File

@ -20,6 +20,7 @@ import nu.marginalia.service.id.ServiceId;
import java.util.List;
import javax.annotation.CheckReturnValue;
import java.util.ServiceConfigurationError;
import java.util.UUID;
@Singleton
@ -39,7 +40,7 @@ public class IndexClient extends AbstractDynamicClient {
this.messageQueueFactory = messageQueueFactory;
String inboxName = ServiceId.Index.name;
String outboxName = System.getProperty("service-name:"+nodeId, UUID.randomUUID().toString());
String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString());
outbox = messageQueueFactory.createOutbox(inboxName, nodeId, outboxName, nodeId, UUID.randomUUID());
setTimeout(30);
}

View File

@ -0,0 +1 @@
ALTER TABLE MESSAGE_QUEUE ADD COLUMN AUDIT_RELATED_ID LONG NOT NULL DEFAULT -1 COMMENT 'To be applied to any new messages created while handling a message';

View File

@ -2,6 +2,7 @@ package nu.marginalia.mq.inbox;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -114,7 +115,9 @@ public class MqAsynchronousInbox implements MqInboxIf {
for (var eventSubscriber : eventSubscribers) {
if (eventSubscriber.filter(msg)) {
handleMessageWithSubscriber(eventSubscriber, msg);
handled = true;
break;
}
@ -136,16 +139,21 @@ public class MqAsynchronousInbox implements MqInboxIf {
private void respondToMessage(MqSubscription subscriber, MqMessage msg) {
try {
MqMessageHandlerRegistry.register(msg.msgId());
final var rsp = subscriber.onRequest(msg);
if (msg.expectsResponse()) {
sendResponse(msg, rsp.state(), rsp.message());
}
else {
registerResponse(msg, rsp.state());
}
} catch (Exception ex) {
logger.error("Message Queue subscriber threw exception", ex);
registerResponse(msg, MqMessageState.ERR);
} finally {
MqMessageHandlerRegistry.deregister();
}
}

View File

@ -3,6 +3,7 @@ package nu.marginalia.mq.inbox;
import lombok.SneakyThrows;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence;
import java.sql.SQLException;
@ -45,7 +46,11 @@ public class MqSingleShotInbox {
var messages = persistence.pollInbox(inboxName, instanceUUID, i, 1);
if (messages.size() > 0) {
if (!messages.isEmpty()) {
for (var message : messages) {
MqMessageHandlerRegistry.register(message.msgId());
}
return Optional.of(messages.iterator().next());
}

View File

@ -2,6 +2,7 @@ package nu.marginalia.mq.inbox;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.mq.persistence.MqPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -81,7 +82,10 @@ public class MqSynchronousInbox implements MqInboxIf {
private void handleMessageWithSubscriber(MqSubscription subscriber, MqMessage msg) {
try {
MqMessageHandlerRegistry.register(msg.msgId());
final var rsp = subscriber.onRequest(msg);
if (msg.expectsResponse()) {
sendResponse(msg, rsp.state(), rsp.message());
}
@ -92,6 +96,9 @@ public class MqSynchronousInbox implements MqInboxIf {
logger.error("Message Queue subscriber threw exception", ex);
registerResponse(msg, MqMessageState.ERR);
}
finally {
MqMessageHandlerRegistry.deregister();
}
}
private void registerResponse(MqMessage msg, MqMessageState state) {

View File

@ -122,6 +122,7 @@ public class MqOutbox {
gson.toJson(request),
null);
}
/** Blocks until a response arrives for the given message id (possibly forever) */
public MqMessage waitResponse(long id) throws Exception {
synchronized (pendingResponses) {

View File

@ -0,0 +1,35 @@
package nu.marginalia.mq.persistence;
import java.util.concurrent.ConcurrentHashMap;
/** Keeps track of which thread is handling a message, to be able to
* paint outgoing messages with a AUDIT_RELATED_ID to relate the
* outgoing message to the incoming message that triggered it.
* <p></p>
* This is a pure audit field, a weaker version of the RELATED_ID,
* which is used by e.g. state machines to relate a series of messages to each other.
* <p></p>
* The class is thread-safe, and tracks the thread ID of the thread that
* is currently handling a message. It can be cleaned up by calling
* deregister() when the message has been handled.
*/
public class MqMessageHandlerRegistry {
// There is some small risk of a memory leak here, if the registry entries aren't cleaned up properly,
// but due to the low volume of messages being sent, this is not a big concern. Since the average
// message rate is less than 1 per second, even if the process ran for 60 years, and we leaked every ID
// put in, the total amount of memory leaked would only be about of order 2 MB.
private static final ConcurrentHashMap<Long, Long> handlerRegistry = new ConcurrentHashMap<>();
public static void register(long msgId) {
handlerRegistry.put(Thread.currentThread().threadId(), msgId);
}
public static long getOriginMessage() {
return handlerRegistry.getOrDefault(Thread.currentThread().threadId(), -1L);
}
public static void deregister() {
handlerRegistry.remove(Thread.currentThread().threadId());
}
}

View File

@ -55,8 +55,8 @@ public class MqPersistence {
) throws Exception {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
INSERT INTO MESSAGE_QUEUE(RECIPIENT_INBOX, SENDER_INBOX, RELATED_ID, FUNCTION, PAYLOAD, TTL)
VALUES(?, ?, ?, ?, ?, ?)
INSERT INTO MESSAGE_QUEUE(RECIPIENT_INBOX, SENDER_INBOX, RELATED_ID, AUDIT_RELATED_ID, FUNCTION, PAYLOAD, TTL)
VALUES(?, ?, ?, ?, ?, ?, ?)
""");
var lastIdQuery = conn.prepareStatement("SELECT LAST_INSERT_ID()")) {
@ -67,11 +67,12 @@ public class MqPersistence {
// Translate null to -1, as 0 is a valid id
stmt.setLong(3, Objects.requireNonNullElse(relatedMessageId, -1L));
stmt.setLong(4, MqMessageHandlerRegistry.getOriginMessage());
stmt.setString(4, function);
stmt.setString(5, payload);
if (ttl == null) stmt.setNull(6, java.sql.Types.BIGINT);
else stmt.setLong(6, ttl.toSeconds());
stmt.setString(5, function);
stmt.setString(6, payload);
if (ttl == null) stmt.setNull(7, java.sql.Types.BIGINT);
else stmt.setLong(7, ttl.toSeconds());
stmt.executeUpdate();

View File

@ -5,6 +5,7 @@ import java.time.LocalDate;
public record MessageQueueEntry (
long id,
long relatedId,
long auditRelatedId,
String senderInbox,
String recipientInbox,
String function,
@ -20,6 +21,9 @@ public record MessageQueueEntry (
public boolean hasRelatedMessage() {
return relatedId > 0;
}
public boolean hasAuditRelation() {
return auditRelatedId > 0;
}
public String stateCode() {
if (state == null) {

View File

@ -57,7 +57,7 @@ public class ControlSysActionsService {
*/
private MqOutbox createApiOutbox(MessageQueueFactory mqFactory) {
String inboxName = ServiceId.Api.name + ":" + "0";
String outboxName = System.getProperty("service-name", UUID.randomUUID().toString());
String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString());
return mqFactory.createOutbox(inboxName, 0, outboxName, 0, UUID.randomUUID());
}

View File

@ -180,7 +180,7 @@ public class MessageQueueService {
public List<MessageQueueEntry> getLastEntries(int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
ORDER BY ID DESC
LIMIT ?
@ -202,7 +202,7 @@ public class MessageQueueService {
public MessageQueueEntry getMessage(long id) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID=?
""")) {
@ -223,7 +223,7 @@ public class MessageQueueService {
public Object getLastEntriesForInbox(String inbox, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE RECIPIENT_INBOX=?
ORDER BY ID DESC
@ -247,7 +247,7 @@ public class MessageQueueService {
public List<MessageQueueEntry> getEntriesForInbox(String inbox, long afterId, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID < ? AND (RECIPIENT_INBOX = ? OR SENDER_INBOX = ?)
ORDER BY ID DESC
@ -274,7 +274,7 @@ public class MessageQueueService {
public List<MessageQueueEntry> getEntriesForInstance(String instance, long afterId, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID < ? AND OWNER_INSTANCE = ?
ORDER BY ID DESC
@ -300,7 +300,7 @@ public class MessageQueueService {
public List<MessageQueueEntry> getEntries(long afterId, int n) {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID < ?
ORDER BY ID DESC
@ -335,9 +335,10 @@ public class MessageQueueService {
// and only available within the operator user interface.
try (var conn = dataSource.getConnection();
var ps = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
SELECT ID, RELATED_ID, AUDIT_RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, PAYLOAD, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM MESSAGE_QUEUE
WHERE ID = ? OR RELATED_ID = ?
WHERE (ID = ? OR RELATED_ID = ? OR AUDIT_RELATED_ID = ?)
AND STATE != 'DEAD' AND FUNCTION != 'MONITOR'
ORDER BY ID DESC
""")) {
@ -349,6 +350,7 @@ public class MessageQueueService {
ps.setLong(1, nextId);
ps.setLong(2, nextId);
ps.setLong(3, nextId);
var rs = ps.executeQuery();
while (rs.next()) {
@ -361,6 +363,8 @@ public class MessageQueueService {
newRelatedIds.add(entry.id());
if (entry.hasRelatedMessage() && !queriedIds.contains(entry.relatedId()))
newRelatedIds.add(entry.relatedId());
if (entry.hasAuditRelation() && !queriedIds.contains(entry.auditRelatedId()))
newRelatedIds.add(entry.auditRelatedId());
}
}
}
@ -376,6 +380,7 @@ public class MessageQueueService {
return new MessageQueueEntry(
rs.getLong("ID"),
rs.getLong("RELATED_ID"),
rs.getLong("AUDIT_RELATED_ID"),
rs.getString("SENDER_INBOX"),
rs.getString("RECIPIENT_INBOX"),
rs.getString("FUNCTION"),

View File

@ -50,11 +50,9 @@
<tr>
<td>{{ttl}}</td>
<td>
{{#if hasRelatedMessage}}
<a href="/message-queue/{{relatedId}}">{{relatedId}}</a>
{{else}}
{{relatedId}}
{{/if}}
{{#if hasRelatedMessage}}<a href="/message-queue/{{relatedId}}">{{relatedId}}</a>{{else}}{{relatedId}}{{/if}}
/
{{#if hasAuditRelation}}<a href="/message-queue/{{auditRelatedId}}">{{auditRelatedId}}</a>{{else}}{{auditRelatedId}}{{/if}}
</td>
<td><a href="/message-queue?inbox={{senderInbox}}">{{senderInbox}}</a></td>
<td style="word-break: break-all; font-family: monospace;">{{payload}}</td>

View File

@ -22,11 +22,10 @@
<tr><td>state</td><td>{{state}}</td><td><a href="{{id}}/edit">[Edit&nbsp;State]</a></td></tr>
<tr><td>senderInbox</td><td><a href="/message-queue?inbox={{senderInbox}}">{{senderInbox}}</a></td><td>{{#if senderInbox}}<a href="{{id}}/reply">[Reply]</a>{{/if}}</td></tr>
<tr><td>relatedId</td><td>
{{#if hasRelatedMessage}}
<a href="{{relatedId}}">{{relatedId}}</a>
{{else}}
{{relatedId}}
{{/if}}
{{#if hasRelatedMessage}} <a href="{{relatedId}}">{{relatedId}}</a> {{else}} {{relatedId}} {{/if}}
</td><td></td></tr>
<tr><td>auditRelatedId</td><td>
{{#if hasAuditRelation}}<a href="/message-queue/{{auditRelatedId}}">{{auditRelatedId}}</a>{{else}}{{auditRelatedId}}{{/if}}
</td><td></td></tr>
<tr><td>function</td><td>{{function}}</td><td></td></tr>
<tr><td>payload</td><td>
@ -42,20 +41,25 @@
<h2>Related Messages</h2>
<table class="table">
<tr>
<th>ID</th>
<th>ID/RelatedID</th>
<th>Recipient Inbox</th>
<th>Sender Inbox</th>
<th>Function</th>
<th>State</th>
</tr>
<tr>
<th colspan="4">Payload</th>
</tr>
{{#each relatedMessages}}
<tr>
<td><a href="{{id}}">{{id}}</a></td>
<td>
<a {{#eq id message.id}}style="font-weight: bold"{{/eq}} href="{{id}}">{{id}}</a>
{{#if hasRelatedMessage}}/ <a href="{{relatedId}}" {{#eq relatedId message.id}}style="font-weight: bold"{{/eq}}>{{relatedId}}</a>{{/if}}
</td>
<td><a href="/message-queue?inbox={{recipientInbox}}">{{recipientInbox}}</a></td>
<td><a href="/message-queue?inbox={{senderInbox}}">{{senderInbox}}</a></td>
<td>{{function}}</td>
<td>{{state}}</td>
</tr>
<tr><td colspan="4" style="word-break: break-all; font-family: monospace;">{{payload}}</td> </tr>
{{/each}}
</table>
{{/if}}

View File

@ -5,6 +5,7 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.*;
import nu.marginalia.mq.persistence.MqMessageHandlerRegistry;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
@ -59,6 +60,9 @@ public class AbstractProcessSpawnerActor extends RecordActorPrototype {
}
// else continue
} else {
// Special: Associate this thread with the message so that we can get tracking
MqMessageHandlerRegistry.register(messages.getFirst().msgId());
yield new Run(0);
}
}