(control) Add a 'cancel' button to the process list

This is a very nice QoL improvement, since it means you don't have to dig in the Actors view to terminate processes.
This commit is contained in:
Viktor Lofgren 2024-01-10 15:02:42 +01:00
parent f310ad8d98
commit f44222ce53
8 changed files with 89 additions and 10 deletions

View File

@ -33,6 +33,11 @@ public class ExecutorClient extends AbstractDynamicClient {
post(ctx, node, "/actor/"+actorName+"/stop", "").blockingSubscribe();
}
public void stopProcess(Context ctx, int node, String id) {
post(ctx, node, "/process/" + id + "/stop", "").blockingSubscribe();
}
public void triggerCrawl(Context ctx, int node, String fid) {
post(ctx, node, "/process/crawl/" + fid, "").blockingSubscribe();
}

View File

@ -226,7 +226,7 @@ public class MqPersistence {
/** Return up to n unprocessed messages from the specified inbox that are in states 'NEW' or 'ACK'
* without updating their ownership information
*/
public Collection<MqMessage> eavesdrop(String inboxName, int n) throws SQLException {
public SequencedCollection<MqMessage> eavesdrop(String inboxName, int n) throws SQLException {
try (var conn = dataSource.getConnection();
var queryStmt = conn.prepareStatement("""
SELECT

View File

@ -87,6 +87,9 @@ public class ControlNodeService {
Spark.get("/public/nodes/:id/storage/conf", this::nodeStorageConfModel, storageConfRenderer::render);
Spark.get("/public/nodes/:id/storage/details", this::nodeStorageDetailsModel, storageDetailsRenderer::render);
Spark.post("/public/nodes/:id/process/:processBase/stop", this::stopProcess,
redirectControl.renderRedirectAcknowledgement("Stopping", "../..")
);
Spark.get("/public/nodes/:id/storage/new-specs", this::newSpecsModel, newSpecsFormRenderer::render);
Spark.post("/public/nodes/:id/storage/new-specs", this::createNewSpecsAction,
redirectControl.renderRedirectAcknowledgement("Creating", ".")
@ -150,6 +153,7 @@ public class ControlNodeService {
return redirectToOverview(req);
}
private Object nodeListModel(Request request, Response response) throws SQLException {
var configs = nodeConfigurationService.getAll();
@ -167,6 +171,15 @@ public class ControlNodeService {
return "";
}
private Object stopProcess(Request request, Response response) {
int nodeId = Integer.parseInt(request.params("id"));
String processBase = request.params("processBase");
executorClient.stopProcess(Context.fromRequest(request), nodeId, processBase);
return "";
}
private Object triggerRestoreBackup(Request request, Response response) {
int nodeId = Integer.parseInt(request.params("id"));

View File

@ -7,6 +7,7 @@
<th>UUID</th>
<th>Status</th>
<th>Progress</th>
<th>Action</th>
</tr>
{{#each processes}}
<tr class="{{#if isMissing}}table-danger{{/if}}">
@ -14,6 +15,15 @@
<td>{{{readableUUID uuidFull}}}</td>
<td>{{status}}</td>
<td style="{{progressStyle}}">{{#if progress}}{{progress}}%{{/if}}</td>
<td>
<form action="/nodes/{{node}}/process/{{processBase}}/stop" method="post">
<button
type="submit"
class="btn btn-danger btn-sm"
onclick="return confirm('Confirm stopping process {{displayName}}')"
{{#if isMissing}}disabled{{/if}}>Cancel</button>
</form>
</td>
</tr>
{{/each}}
{{#unless processes}}

View File

@ -2,6 +2,11 @@ package nu.marginalia.actor;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.proc.ProcessLivenessMonitorActor;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.process.ProcessService;
import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Request;
@ -11,10 +16,20 @@ import spark.Spark;
@Singleton
public class ActorApi {
private final ExecutorActorControlService actors;
private final ProcessService processService;
private final MqPersistence mqPersistence;
private final ServiceConfiguration serviceConfiguration;
private final Logger logger = LoggerFactory.getLogger(getClass());
@Inject
public ActorApi(ExecutorActorControlService actors) {
public ActorApi(ExecutorActorControlService actors,
ProcessService processService,
MqPersistence mqPersistence,
ServiceConfiguration serviceConfiguration)
{
this.actors = actors;
this.processService = processService;
this.mqPersistence = mqPersistence;
this.serviceConfiguration = serviceConfiguration;
}
public Object startActorFromState(Request request, Response response) throws Exception {
@ -42,6 +57,33 @@ public class ActorApi {
return "OK";
}
public Object stopProcess(Request request, Response response) {
ProcessService.ProcessId id = ProcessService.translateExternalIdBase(request.params("id"));
try {
String inbox = id.name().toLowerCase() + ":" + serviceConfiguration.node();
var lastMessages = mqPersistence.eavesdrop(inbox, 1);
// If there are any messages in the inbox, we mark them as dead to prevent
// the process spawner from reviving the process immediately
if (null != lastMessages && !lastMessages.isEmpty()) {
var lastMessage = lastMessages.getFirst();
if (lastMessage.state() == MqMessageState.ACK) {
mqPersistence.updateMessageState(lastMessages.getFirst().msgId(), MqMessageState.DEAD);
}
}
processService.kill(id);
}
catch (Exception ex) {
logger.error("Failed to stop process {}", id, ex);
}
return "OK";
}
public ExecutorActor translateActor(String name) {
try {
return ExecutorActor.valueOf(name.toUpperCase());

View File

@ -196,6 +196,7 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
throw new RuntimeException(ex);
}
}
private record ProcessHeartbeat(
String processId,
String processBase,
@ -208,14 +209,7 @@ public class ProcessLivenessMonitorActor extends RecordActorPrototype {
return "RUNNING".equals(status);
}
public ProcessService.ProcessId getProcessId() {
return switch (processBase) {
case "converter" -> ProcessService.ProcessId.CONVERTER;
case "crawler" -> ProcessService.ProcessId.CRAWLER;
case "loader" -> ProcessService.ProcessId.LOADER;
case "website-adjacencies-calculator" -> ProcessService.ProcessId.ADJACENCIES_CALCULATOR;
case "index-constructor" -> ProcessService.ProcessId.INDEX_CONSTRUCTOR;
default -> null;
};
return ProcessService.translateExternalIdBase(processBase);
}
}

View File

@ -52,8 +52,11 @@ public class ExecutorSvc extends Service {
Spark.post("/actor/:id/start", actorApi::startActor);
Spark.post("/actor/:id/start/:state", actorApi::startActorFromState);
Spark.post("/actor/:id/stop", actorApi::stopActor);
Spark.get("/actor", this::getActorStates, gson::toJson);
Spark.post("/process/:id/stop", actorApi::stopProcess);
Spark.post("/process/crawl/:fid", processingService::startCrawl);
Spark.post("/process/recrawl", processingService::startRecrawl);
Spark.post("/process/convert/:fid", processingService::startConversion);

View File

@ -30,6 +30,18 @@ public class ProcessService {
private final ConcurrentHashMap<ProcessId, Process> processes = new ConcurrentHashMap<>();
public static ProcessService.ProcessId translateExternalIdBase(String id) {
return switch (id) {
case "converter" -> ProcessService.ProcessId.CONVERTER;
case "crawler" -> ProcessService.ProcessId.CRAWLER;
case "loader" -> ProcessService.ProcessId.LOADER;
case "website-adjacencies-calculator" -> ProcessService.ProcessId.ADJACENCIES_CALCULATOR;
case "index-constructor" -> ProcessService.ProcessId.INDEX_CONSTRUCTOR;
default -> null;
};
}
public enum ProcessId {
CRAWLER("crawler-process/bin/crawler-process"),
CONVERTER("converter-process/bin/converter-process"),