(control) Better use of FS states, fix bug with start/stop actors
This commit is contained in:
parent
10fc489822
commit
c6abcd91fa
@ -40,7 +40,7 @@ public record FileStorage (
|
||||
type,
|
||||
LocalDateTime.now(),
|
||||
override,
|
||||
FileStorageState.EPHEMERAL,
|
||||
FileStorageState.UNSET,
|
||||
"OVERRIDE:" + type.name()
|
||||
);
|
||||
}
|
||||
@ -52,6 +52,15 @@ public record FileStorage (
|
||||
public boolean isActive() {
|
||||
return FileStorageState.ACTIVE.equals(state);
|
||||
}
|
||||
public boolean isNoState() {
|
||||
return FileStorageState.UNSET.equals(state);
|
||||
}
|
||||
public boolean isDelete() {
|
||||
return FileStorageState.DELETE.equals(state);
|
||||
}
|
||||
public boolean isNew() {
|
||||
return FileStorageState.NEW.equals(state);
|
||||
}
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
if (this == o) return true;
|
||||
|
@ -4,8 +4,7 @@ public enum FileStorageState {
|
||||
UNSET,
|
||||
NEW,
|
||||
ACTIVE,
|
||||
DELETE,
|
||||
EPHEMERAL;
|
||||
DELETE;
|
||||
|
||||
public static FileStorageState parse(String value) {
|
||||
if ("".equals(value)) {
|
||||
|
@ -101,8 +101,22 @@ public class ControlNodeService {
|
||||
Spark.post("/public/nodes/:id/storage/:fid/disable", this::disableFileStorage);
|
||||
Spark.get("/public/nodes/:id/storage/:fid/transfer", this::downloadFileFromStorage);
|
||||
|
||||
|
||||
Spark.post("/public/nodes/:id/fsms/:fsm/start", this::startFsm);
|
||||
Spark.post("/public/nodes/:id/fsms/:fsm/stop", this::stopFsm);
|
||||
}
|
||||
|
||||
public Object startFsm(Request req, Response rsp) throws Exception {
|
||||
executorClient.startFsm(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fsm").toUpperCase());
|
||||
|
||||
return redirectToOverview(req);
|
||||
}
|
||||
|
||||
public Object stopFsm(Request req, Response rsp) throws Exception {
|
||||
executorClient.stopFsm(Context.fromRequest(req), Integer.parseInt(req.params("node")), req.params("fsm").toUpperCase());
|
||||
|
||||
return redirectToOverview(req);
|
||||
}
|
||||
private Object nodeListModel(Request request, Response response) throws SQLException {
|
||||
var configs = nodeConfigurationService.getAll();
|
||||
|
||||
|
@ -107,12 +107,20 @@
|
||||
<tr>
|
||||
<td title="{{timestampFull}}">{{timestamp}}</td>
|
||||
<td>
|
||||
{{#unless storage.active}}
|
||||
{{#if storage.delete}}
|
||||
<span title="This item will be deleted">DELETING</span>
|
||||
{{/if}}
|
||||
|
||||
{{#if storage.new}}
|
||||
<span title="This item is being created">NEW</span>
|
||||
{{/if}}
|
||||
|
||||
{{#if storage.noState}}
|
||||
<div class="form-check form-switch">
|
||||
<label class="form-check-label" for="flexSwitchCheckDefault">Archived</label>
|
||||
<input class="form-check-input" type="checkbox" id="flexSwitchCheckDefault" onclick="setActive({{storage.id}})">
|
||||
</div>
|
||||
{{/unless}}
|
||||
{{/if}}
|
||||
|
||||
{{#if storage.active}}
|
||||
<div class="form-check form-switch">
|
||||
|
@ -55,6 +55,8 @@ public class ConvertActor extends AbstractActorPrototype {
|
||||
public long loaderMsgId = 0L;
|
||||
};
|
||||
|
||||
public record WaitInstructions(long msgId, FileStorageId dest) { }
|
||||
|
||||
@Override
|
||||
public String describe() {
|
||||
return "Convert a set of crawl data into a format suitable for loading into the database.";
|
||||
@ -98,8 +100,8 @@ public class ConvertActor extends AbstractActorPrototype {
|
||||
FileStorageType.PROCESSED_DATA, "processed-data",
|
||||
"Processed Data; " + toProcess.description());
|
||||
|
||||
storageService.setFileStorageState(processedArea.id(), FileStorageState.EPHEMERAL);
|
||||
storageService.relateFileStorages(toProcess.id(), processedArea.id());
|
||||
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
|
||||
|
||||
// Pre-send convert request
|
||||
var request = new ConvertRequest(ConvertAction.ConvertCrawlData,
|
||||
@ -132,6 +134,8 @@ public class ConvertActor extends AbstractActorPrototype {
|
||||
FileStorageType.PROCESSED_DATA, "processed-data",
|
||||
"Processed Encylopedia Data; " + fileName);
|
||||
|
||||
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
|
||||
|
||||
// Pre-send convert request
|
||||
var request = new ConvertRequest(ConvertAction.SideloadEncyclopedia,
|
||||
sourcePath.toString(),
|
||||
@ -164,6 +168,8 @@ public class ConvertActor extends AbstractActorPrototype {
|
||||
FileStorageType.PROCESSED_DATA, "processed-data",
|
||||
"Processed Dirtree Data; " + fileName);
|
||||
|
||||
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
|
||||
|
||||
// Pre-send convert request
|
||||
var request = new ConvertRequest(ConvertAction.SideloadDirtree,
|
||||
sourcePath.toString(),
|
||||
@ -195,6 +201,8 @@ public class ConvertActor extends AbstractActorPrototype {
|
||||
FileStorageType.PROCESSED_DATA, "processed-data",
|
||||
"Processed Stackexchange Data; " + fileName);
|
||||
|
||||
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
|
||||
|
||||
// Pre-send convert request
|
||||
var request = new ConvertRequest(ConvertAction.SideloadStackexchange,
|
||||
sourcePath.toString(),
|
||||
@ -212,12 +220,15 @@ public class ConvertActor extends AbstractActorPrototype {
|
||||
Wait for the converter to finish processing the data.
|
||||
"""
|
||||
)
|
||||
public void convertWait(Long msgId) throws Exception {
|
||||
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, msgId);
|
||||
public void convertWait(WaitInstructions instructions) throws Exception {
|
||||
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, instructions.msgId());
|
||||
|
||||
if (rsp.state() != MqMessageState.OK)
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
error("Converter failed");
|
||||
}
|
||||
|
||||
storageService.setFileStorageState(instructions.dest, FileStorageState.UNSET);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -141,7 +141,7 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
|
||||
var processedArea = storageService.allocateTemporaryStorage(base, FileStorageType.PROCESSED_DATA, "processed-data",
|
||||
"Processed Data; " + toProcess.description());
|
||||
|
||||
storageService.setFileStorageState(processedArea.id(), FileStorageState.EPHEMERAL);
|
||||
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
|
||||
storageService.relateFileStorages(toProcess.id(), processedArea.id());
|
||||
|
||||
// Pre-send convert request
|
||||
@ -201,13 +201,16 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
|
||||
private void cleanProcessedStorage(List<FileStorageId> processedStorageId) {
|
||||
try {
|
||||
var config = nodeConfigurationService.get(nodeId);
|
||||
if (!config.autoClean())
|
||||
return;
|
||||
|
||||
for (var id : processedStorageId) {
|
||||
if (FileStorageState.EPHEMERAL.equals(storageService.getStorage(id).state())) {
|
||||
if (FileStorageState.NEW.equals(storageService.getStorage(id).state())) {
|
||||
if (config.autoClean()) {
|
||||
storageService.flagFileForDeletion(id);
|
||||
}
|
||||
else {
|
||||
storageService.setFileStorageState(id, FileStorageState.UNSET);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (SQLException ex) {
|
||||
|
Loading…
Reference in New Issue
Block a user