(control/executor) Add new configuration options for node
It's now possible to configure prod instance to not retain processed data.
This commit is contained in:
parent
2b3c167845
commit
d76d926c38
@ -17,7 +17,6 @@ import java.io.OutputStream;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class ExecutorClient extends AbstractDynamicClient {
|
||||
@ -45,18 +44,9 @@ public class ExecutorClient extends AbstractDynamicClient {
|
||||
public void triggerConvert(Context ctx, int node, FileStorageId fid) {
|
||||
post(ctx, node, "/process/convert/" + fid.id(), "").blockingSubscribe();
|
||||
}
|
||||
@Deprecated
|
||||
public void triggerConvert(Context ctx, int node, String fid) {
|
||||
post(ctx, node, "/process/convert/" + fid, "").blockingSubscribe();
|
||||
}
|
||||
|
||||
public void triggerProcessAndLoad(Context ctx, int node, String fid) {
|
||||
post(ctx, node, "/process/convert-load/" + fid, "").blockingSubscribe();
|
||||
}
|
||||
|
||||
@Deprecated
|
||||
public void loadProcessedData(Context ctx, int node, String fid) {
|
||||
loadProcessedData(ctx, node, new LoadParameters(List.of(new FileStorageId(Long.parseLong(fid)))));
|
||||
public void triggerConvertAndLoad(Context ctx, int node, FileStorageId fid) {
|
||||
post(ctx, node, "/process/convert-load/" + fid.id(), "").blockingSubscribe();
|
||||
}
|
||||
|
||||
public void loadProcessedData(Context ctx, int node, LoadParameters ids) {
|
||||
|
@ -325,7 +325,7 @@ public class FileStorageService {
|
||||
type,
|
||||
createDateTime,
|
||||
path,
|
||||
state,
|
||||
FileStorageState.parse(state),
|
||||
description
|
||||
);
|
||||
}
|
||||
@ -380,7 +380,7 @@ public class FileStorageService {
|
||||
type,
|
||||
createDateTime,
|
||||
path,
|
||||
state,
|
||||
FileStorageState.parse(state),
|
||||
description
|
||||
);
|
||||
}
|
||||
@ -441,7 +441,7 @@ public class FileStorageService {
|
||||
type,
|
||||
createDateTime,
|
||||
path,
|
||||
state,
|
||||
FileStorageState.parse(state),
|
||||
description
|
||||
));
|
||||
}
|
||||
@ -454,20 +454,21 @@ public class FileStorageService {
|
||||
}
|
||||
|
||||
public void flagFileForDeletion(FileStorageId id) throws SQLException {
|
||||
setFileStorageState(id, "DELETE");
|
||||
setFileStorageState(id, FileStorageState.DELETE);
|
||||
}
|
||||
|
||||
public void enableFileStorage(FileStorageId id) throws SQLException {
|
||||
setFileStorageState(id, "ACTIVE");
|
||||
setFileStorageState(id, FileStorageState.ACTIVE);
|
||||
}
|
||||
public void disableFileStorage(FileStorageId id) throws SQLException {
|
||||
setFileStorageState(id, "");
|
||||
setFileStorageState(id, FileStorageState.UNSET);
|
||||
}
|
||||
|
||||
private void setFileStorageState(FileStorageId id, String state) throws SQLException {
|
||||
public void setFileStorageState(FileStorageId id, FileStorageState state) throws SQLException {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var flagStmt = conn.prepareStatement("UPDATE FILE_STORAGE SET STATE = ? WHERE ID = ?")) {
|
||||
flagStmt.setString(1, state);
|
||||
String value = state == FileStorageState.UNSET ? "" : state.name();
|
||||
flagStmt.setString(1, value);
|
||||
flagStmt.setLong(2, id.id());
|
||||
flagStmt.executeUpdate();
|
||||
}
|
||||
@ -480,6 +481,7 @@ public class FileStorageService {
|
||||
INNER JOIN FILE_STORAGE_BASE ON BASE_ID=FILE_STORAGE_BASE.ID
|
||||
SET FILE_STORAGE.STATE = ''
|
||||
WHERE FILE_STORAGE.TYPE = ?
|
||||
AND FILE_STORAGE.TYPE = 'ACTIVE'
|
||||
AND FILE_STORAGE_BASE.NODE=?
|
||||
""")) {
|
||||
flagStmt.setString(1, type.name());
|
||||
|
@ -13,13 +13,13 @@ import java.util.Objects;
|
||||
* @param path the full path of the storage on disk
|
||||
* @param description a description of the storage
|
||||
*/
|
||||
public record FileStorage(
|
||||
public record FileStorage (
|
||||
FileStorageId id,
|
||||
FileStorageBase base,
|
||||
FileStorageType type,
|
||||
LocalDateTime createDateTime,
|
||||
String path,
|
||||
String state,
|
||||
FileStorageState state,
|
||||
String description)
|
||||
{
|
||||
|
||||
@ -40,7 +40,7 @@ public record FileStorage(
|
||||
type,
|
||||
LocalDateTime.now(),
|
||||
override,
|
||||
"OVERRIDE",
|
||||
FileStorageState.EPHEMERAL,
|
||||
"OVERRIDE:" + type.name()
|
||||
);
|
||||
}
|
||||
@ -50,7 +50,7 @@ public record FileStorage(
|
||||
}
|
||||
|
||||
public boolean isActive() {
|
||||
return "ACTIVE".equals(state);
|
||||
return FileStorageState.ACTIVE.equals(state);
|
||||
}
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
||||
|
@ -0,0 +1,16 @@
|
||||
package nu.marginalia.storage.model;
|
||||
|
||||
public enum FileStorageState {
|
||||
UNSET,
|
||||
NEW,
|
||||
ACTIVE,
|
||||
DELETE,
|
||||
EPHEMERAL;
|
||||
|
||||
public static FileStorageState parse(String value) {
|
||||
if ("".equals(value)) {
|
||||
return UNSET;
|
||||
}
|
||||
return valueOf(value);
|
||||
}
|
||||
}
|
@ -175,7 +175,7 @@ public class ControlNodeService {
|
||||
|
||||
var toConvert = fileStorageService.getOnlyActiveFileStorage(nodeId, FileStorageType.CRAWL_DATA);
|
||||
|
||||
executorClient.triggerConvert(Context.fromRequest(request),
|
||||
executorClient.triggerConvertAndLoad(Context.fromRequest(request),
|
||||
nodeId,
|
||||
toConvert.orElseThrow(AssertionError::new));
|
||||
|
||||
|
@ -15,6 +15,7 @@ import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageBaseType;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import nu.marginalia.storage.model.FileStorageState;
|
||||
import nu.marginalia.storage.model.FileStorageType;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
@ -97,6 +98,7 @@ public class ConvertActor extends AbstractActorPrototype {
|
||||
FileStorageType.PROCESSED_DATA, "processed-data",
|
||||
"Processed Data; " + toProcess.description());
|
||||
|
||||
storageService.setFileStorageState(processedArea.id(), FileStorageState.EPHEMERAL);
|
||||
storageService.relateFileStorages(toProcess.id(), processedArea.id());
|
||||
|
||||
// Pre-send convert request
|
||||
|
@ -10,8 +10,11 @@ import nu.marginalia.actor.ActorStateFactory;
|
||||
import nu.marginalia.actor.prototype.AbstractActorPrototype;
|
||||
import nu.marginalia.actor.state.ActorResumeBehavior;
|
||||
import nu.marginalia.actor.state.ActorState;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import nu.marginalia.process.ProcessOutboxes;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import nu.marginalia.storage.model.FileStorageState;
|
||||
import nu.marginalia.svc.BackupService;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageBaseType;
|
||||
@ -58,6 +61,9 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
|
||||
private final FileStorageService storageService;
|
||||
private final BackupService backupService;
|
||||
private final Gson gson;
|
||||
private final NodeConfigurationService nodeConfigurationService;
|
||||
|
||||
private final int nodeId;
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
|
||||
@ -81,7 +87,9 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
|
||||
FileStorageService storageService,
|
||||
IndexClient indexClient,
|
||||
BackupService backupService,
|
||||
Gson gson
|
||||
Gson gson,
|
||||
NodeConfigurationService nodeConfigurationService,
|
||||
ServiceConfiguration serviceConfiguration
|
||||
)
|
||||
{
|
||||
super(stateFactory);
|
||||
@ -93,6 +101,9 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
|
||||
this.storageService = storageService;
|
||||
this.backupService = backupService;
|
||||
this.gson = gson;
|
||||
this.nodeConfigurationService = nodeConfigurationService;
|
||||
|
||||
this.nodeId = serviceConfiguration.node();
|
||||
}
|
||||
|
||||
@ActorState(name = INITIAL,
|
||||
@ -130,6 +141,7 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
|
||||
var processedArea = storageService.allocateTemporaryStorage(base, FileStorageType.PROCESSED_DATA, "processed-data",
|
||||
"Processed Data; " + toProcess.description());
|
||||
|
||||
storageService.setFileStorageState(processedArea.id(), FileStorageState.EPHEMERAL);
|
||||
storageService.relateFileStorages(toProcess.id(), processedArea.id());
|
||||
|
||||
// Pre-send convert request
|
||||
@ -178,12 +190,31 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
|
||||
}
|
||||
var rsp = processWatcher.waitResponse(mqLoaderOutbox, ProcessService.ProcessId.LOADER, message.loaderMsgId);
|
||||
|
||||
if (rsp.state() != MqMessageState.OK)
|
||||
if (rsp.state() != MqMessageState.OK) {
|
||||
error("Loader failed");
|
||||
|
||||
} else {
|
||||
cleanProcessedStorage(message.processedStorageId);
|
||||
}
|
||||
return message;
|
||||
}
|
||||
|
||||
private void cleanProcessedStorage(List<FileStorageId> processedStorageId) {
|
||||
try {
|
||||
var config = nodeConfigurationService.get(nodeId);
|
||||
if (!config.autoClean())
|
||||
return;
|
||||
|
||||
for (var id : processedStorageId) {
|
||||
if (FileStorageState.EPHEMERAL.equals(storageService.getStorage(id).state())) {
|
||||
storageService.flagFileForDeletion(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
catch (SQLException ex) {
|
||||
logger.error("Error in clean-up", ex);
|
||||
}
|
||||
}
|
||||
|
||||
@ActorState(
|
||||
name = BACKUP,
|
||||
next = REPARTITION,
|
||||
|
@ -42,7 +42,9 @@ public class ProcessingService {
|
||||
}
|
||||
|
||||
public Object startConversion(Request request, Response response) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.CONVERT, ConvertActor.CONVERT, FileStorageId.parse(request.params("fid")));
|
||||
actorControlService.startFrom(ExecutorActor.CONVERT,
|
||||
ConvertActor.CONVERT,
|
||||
FileStorageId.parse(request.params("fid")));
|
||||
|
||||
return "";
|
||||
}
|
||||
|
@ -93,30 +93,14 @@ public class ExecutorSvcApiIntegrationTest {
|
||||
Mockito.verify(testInstances.actorControlService).start(eq(ExecutorActor.RECRAWL), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void triggerConvert() throws Exception {
|
||||
testInstances.client.triggerConvert(Context.internal(), 0, "1");
|
||||
|
||||
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT), eq("CONVERT"), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void triggerProcessAndLoad() throws Exception {
|
||||
testInstances.client.triggerProcessAndLoad(Context.internal(), 0, "1");
|
||||
testInstances.client.triggerConvertAndLoad(Context.internal(), 0, FileStorageId.of(1));
|
||||
|
||||
Mockito.verify(testInstances.actorControlService).start(eq(ExecutorActor.CONVERT_AND_LOAD), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void loadProcessedData() throws Exception {
|
||||
testInstances.client.loadProcessedData(Context.internal(), 0, "1");
|
||||
|
||||
Mockito.verify(testInstances.actorControlService).startFrom(
|
||||
eq(ExecutorActor.CONVERT_AND_LOAD),
|
||||
eq("LOAD"),
|
||||
any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void calculateAdjacencies() throws Exception {
|
||||
testInstances.client.calculateAdjacencies(Context.internal(), 0);
|
||||
|
Loading…
Reference in New Issue
Block a user