(actor) Rewrite of the actor prototype class using record pattern matching

This commit is contained in:
Viktor Lofgren 2023-10-23 10:18:20 +02:00
parent 119151cad3
commit 2ed2f35a9b
44 changed files with 1132 additions and 2319 deletions

View File

@ -1,144 +0,0 @@
package nu.marginalia.actor;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;
import com.google.inject.Inject;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.actor.state.ActorStateTransition;
import java.util.function.Function;
import java.util.function.Supplier;
/** Factory for creating actor state instances. You probably don't want to use this directly.
* <p>
* Use AbstractStatePrototype instead. */
public class ActorStateFactory {
private final Gson gson;
@Inject
public ActorStateFactory(Gson gson) {
this.gson = gson;
}
public <T> ActorStateInstance create(String name, ActorResumeBehavior resumeBehavior, Class<T> param, Function<T, ActorStateTransition> logic) {
return new ActorStateInstance() {
@Override
public String name() {
return name;
}
@Override
public ActorStateTransition next(String message) {
if (message.isEmpty()) {
return logic.apply(null);
}
try {
var paramObj = gson.fromJson(message, param);
return logic.apply(paramObj);
}
catch (JsonSyntaxException ex) {
throw new IllegalArgumentException("Failed to parse '" + message +
"' into a '" + param.getSimpleName() + "'", ex);
}
}
@Override
public ActorResumeBehavior resumeBehavior() {
return resumeBehavior;
}
@Override
public boolean isFinal() {
return false;
}
};
}
public ActorStateInstance create(String name, ActorResumeBehavior actorResumeBehavior, Supplier<ActorStateTransition> logic) {
return new ActorStateInstance() {
@Override
public String name() {
return name;
}
@Override
public ActorStateTransition next(String message) {
return logic.get();
}
@Override
public ActorResumeBehavior resumeBehavior() {
return actorResumeBehavior;
}
@Override
public boolean isFinal() {
return false;
}
};
}
public ActorStateTransition transition(String state) {
return ActorStateTransition.to(state);
}
public ActorStateTransition transition(String state, Object message) {
if (null == message) {
return ActorStateTransition.to(state);
}
return ActorStateTransition.to(state, gson.toJson(message));
}
static class ErrorStateInstance implements ActorStateInstance {
@Override
public String name() { return "ERROR"; }
@Override
public ActorStateTransition next(String message) {
throw new UnsupportedOperationException();
}
@Override
public ActorResumeBehavior resumeBehavior() { return ActorResumeBehavior.RETRY; }
@Override
public boolean isFinal() { return true; }
}
static class FinalState implements ActorStateInstance {
@Override
public String name() { return "END"; }
@Override
public ActorStateTransition next(String message) {
throw new UnsupportedOperationException();
}
@Override
public ActorResumeBehavior resumeBehavior() { return ActorResumeBehavior.RETRY; }
@Override
public boolean isFinal() { return true; }
}
static class ResumingState implements ActorStateInstance {
@Override
public String name() { return "RESUMING"; }
@Override
public ActorStateTransition next(String message) {
throw new UnsupportedOperationException();
}
@Override
public ActorResumeBehavior resumeBehavior() { return ActorResumeBehavior.RETRY; }
@Override
public boolean isFinal() { return false; }
}
}

View File

@ -34,9 +34,9 @@ public class ActorStateMachine {
private volatile ExpectedMessage expectedMessage = ExpectedMessage.anyUnrelated();
private final ActorStateInstance errorState = new ActorStateFactory.ErrorStateInstance();
private final ActorStateInstance finalState = new ActorStateFactory.FinalState();
private final ActorStateInstance resumingState = new ActorStateFactory.ResumingState();
private final ActorStateInstance errorState = new ErrorState();
private final ActorStateInstance finalState = new FinalState();
private final ActorStateInstance resumingState = new ResumingState();
private final List<BiConsumer<String, String>> stateChangeListeners = new ArrayList<>();
private final Map<String, ActorStateInstance> allStates = new HashMap<>();
@ -60,20 +60,6 @@ public class ActorStateMachine {
registerStates(statePrototype);
isDirectlyInitializable = statePrototype.isDirectlyInitializable();
statePrototype.declaredStates().forEach((name, declaredState) -> {
if (!allStates.containsKey(name)) {
throw new IllegalArgumentException("State " + name + " is not defined in the state graph");
}
if (!allStates.containsKey(declaredState.next())) {
throw new IllegalArgumentException("State " + declaredState.next() + " is not defined in the state graph");
}
for (var state : declaredState.transitions()) {
if (!allStates.containsKey(state)) {
throw new IllegalArgumentException("State " + state + " is not defined in the state graph");
}
}
});
resume();
smInbox.start();
@ -217,7 +203,10 @@ public class ActorStateMachine {
MqMessage message)
{
try {
if (resumeState.resumeBehavior().equals(ActorResumeBehavior.ERROR)) {
if (resumeState == null) {
// This is primarily something that happens during migrations
smOutbox.sendNotice(expectedMessage.id, "ERROR", "Resumption from unknown ACK'ed state " + message.function());
} else if (resumeState.resumeBehavior().equals(ActorResumeBehavior.ERROR)) {
// The message is acknowledged, but the state does not support resuming
smOutbox.sendNotice(expectedMessage.id, "ERROR", "Illegal resumption from ACK'ed state " + message.function());
}
@ -370,5 +359,71 @@ public class ActorStateMachine {
}
}
}
private static class ErrorState implements ActorStateInstance {
@Override
public String name() {
return "ERROR";
}
@Override
public ActorStateTransition next(String message) {
throw new UnsupportedOperationException();
}
@Override
public ActorResumeBehavior resumeBehavior() {
return ActorResumeBehavior.RETRY;
}
@Override
public boolean isFinal() {
return true;
}
}
private static class FinalState implements ActorStateInstance {
@Override
public String name() {
return "END";
}
@Override
public ActorStateTransition next(String message) {
throw new UnsupportedOperationException();
}
@Override
public ActorResumeBehavior resumeBehavior() {
return ActorResumeBehavior.RETRY;
}
@Override
public boolean isFinal() {
return true;
}
}
private static class ResumingState implements ActorStateInstance {
@Override
public String name() {
return "RESUMING";
}
@Override
public ActorStateTransition next(String message) {
throw new UnsupportedOperationException();
}
@Override
public ActorResumeBehavior resumeBehavior() {
return ActorResumeBehavior.RETRY;
}
@Override
public boolean isFinal() {
return false;
}
}
}

View File

@ -1,237 +0,0 @@
package nu.marginalia.actor.prototype;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorTerminalState;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.actor.state.ActorStateTransition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.*;
/** Base class for actors. The state graph is defined using public methods
* annotated with {@code @ActorState} and {@code @ActorTerminalState}. This class provide
* a mediation layer that translates these annotations into a state graph
* that can be used by the actor runtime.
* <p> . <p>
* <pre>
* public class MyActor extends AbstractActorPrototype {
* {@code @ActorState(name="INITIAL", next="STATE_1")}
* public void initial() { ... }
* {@code @ActorState(name="STATE_1", next="STATE_N")}
* public void state1() { ... }
* ...
* }
* </pre>
* <p>
* The prototype provides explicit transition() and error() methods that can be used
* to jump to a different state. Each of these methods come with a variant that has a
* parameter. The parameter will be passed as a payload to the next state.
* </p>
* <p>The @ActorState annotation also provides a default next
* state that will be transitioned to automatically when the method returns. If the
* method returns a value, this value will be passed as a payload to the next state,
* and injected as a parameter to the handler method.</p>
* <h2>Caveat</h2>
* The jump functions are implemented using exceptions. This means that if you have
* a {@code try {} catch(Exception e)} block in your code or a {@code @SneakyThrows}
* annotation, you will catch the exception and prevent the transition.
*
*/
public abstract class AbstractActorPrototype implements ActorPrototype {
private final ActorStateFactory stateFactory;
private static final Logger logger = LoggerFactory.getLogger(AbstractActorPrototype.class);
public AbstractActorPrototype(ActorStateFactory stateFactory) {
this.stateFactory = stateFactory;
}
/** Explicitly transition to a different state.
* <p>
* Caveat: This is implemented via an exception. Mind your catch statements. */
public void transition(String state) {
throw new ControlFlowException(state, null);
}
/** Explicitly transition to a different state, encoding a payload.
* <p>
* Caveat: This is implemented via an exception. Mind your catch statements. */
public <T> void transition(String state, T payload) {
throw new ControlFlowException(state, payload);
}
/** Explicitly transition to the error state.
* <p>
* Caveat: This is implemented via an exception. Mind your catch statements. */
public void error() {
throw new ControlFlowException("ERROR", "");
}
/** Explicitly transition to the error state with an error message.
* <p>
* Caveat: This is implemented via an exception. Mind your catch statements. */
public <T> void error(T payload) {
throw new ControlFlowException("ERROR", payload);
}
/** Explicitly transition to the error state.
* <p>
* Caveat: This is implemented via an exception. Mind your catch statements. */
public void error(Exception ex) {
throw new ControlFlowException("ERROR", ex.getClass().getSimpleName() + ":" + ex.getMessage());
}
@Override
public boolean isDirectlyInitializable() {
for (var method : getClass().getMethods()) {
var gs = method.getAnnotation(ActorState.class);
if (gs == null) {
continue;
}
if ("INITIAL".equals(gs.name()) && method.getParameterCount() == 0) {
return true;
}
}
return false;
}
@Override
public Map<String, ActorState> declaredStates() {
Map<String, ActorState> ret = new HashMap<>();
for (var method : getClass().getMethods()) {
var gs = method.getAnnotation(ActorState.class);
if (gs != null) {
ret.put(gs.name(), gs);
}
}
return ret;
}
/** Compile a list of ActorStateInstances from the @ActorState and @ActorTerminalState annotations.
*/
@Override
public List<ActorStateInstance> asStateList() {
List<ActorStateInstance> ret = new ArrayList<>();
for (var method : getClass().getMethods()) {
var gs = method.getAnnotation(ActorState.class);
if (gs != null) {
ret.add(createStateInstance(method, gs));
}
var ts = method.getAnnotation(ActorTerminalState.class);
if (ts != null) {
ret.add(createTerminalStateInstance(ts));
}
}
return ret;
}
private ActorStateInstance createStateInstance(Method method, ActorState gs) {
var parameters = method.getParameterTypes();
boolean returnsVoid = method.getGenericReturnType().equals(Void.TYPE);
if (parameters.length == 0) {
return stateFactory.create(gs.name(), gs.resume(), () -> {
try {
if (returnsVoid) {
method.invoke(this);
return ActorStateTransition.to(gs.next());
} else {
Object ret = method.invoke(this);
return stateFactory.transition(gs.next(), ret);
}
}
catch (Exception e) {
return translateInvocationExceptionToStateTransition(gs.name(), e);
}
});
}
else if (parameters.length == 1) {
return stateFactory.create(gs.name(), gs.resume(), parameters[0], (param) -> {
try {
if (returnsVoid) {
method.invoke(this, param);
return ActorStateTransition.to(gs.next());
} else {
Object ret = method.invoke(this, param);
return stateFactory.transition(gs.next(), ret);
}
}
catch (Exception e) {
return translateInvocationExceptionToStateTransition(gs.name(), e);
}
});
}
else {
// We permit only @ActorState-annotated methods like this:
//
// void foo();
// void foo(Object bar);
// Object foo();
// Object foo(Object bar);
throw new IllegalStateException("ActorStatePrototype " +
getClass().getSimpleName() +
" has invalid method signature for method " +
method.getName() +
": Expected 0 or 1 parameter(s) but found " +
Arrays.toString(parameters));
}
}
private ActorStateInstance createTerminalStateInstance(ActorTerminalState ts) {
final String name = ts.name();
return stateFactory.create(name, ActorResumeBehavior.ERROR, () -> {
throw new ControlFlowException(name, null);
});
}
private ActorStateTransition translateInvocationExceptionToStateTransition(String state, Throwable ex) {
while (ex instanceof InvocationTargetException e) {
if (e.getCause() != null) ex = ex.getCause();
}
if (ex instanceof ControlFlowException cfe) {
return stateFactory.transition(cfe.getState(), cfe.getPayload());
}
else if (ex instanceof InterruptedException intE) {
logger.error("State execution was interrupted " + state);
return ActorStateTransition.to("ERR", "Execution interrupted");
}
else {
logger.error("Error in state invocation " + state, ex);
return ActorStateTransition.to("ERROR",
"Exception: " + ex.getClass().getSimpleName() + "/" + ex.getMessage());
}
}
/** Exception thrown by a state to indicate that the state machine should jump to a different state. */
public static class ControlFlowException extends RuntimeException {
private final String state;
private final Object payload;
public ControlFlowException(String state, Object payload) {
this.state = state;
this.payload = payload;
}
public String getState() {
return state;
}
public Object getPayload() {
return payload;
}
public StackTraceElement[] getStackTrace() { return new StackTraceElement[0]; }
}
}

View File

@ -1,12 +1,8 @@
package nu.marginalia.actor.prototype;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.actor.state.ActorTerminalState;
import java.util.List;
import java.util.Map;
import java.util.Set;
public interface ActorPrototype {
/**
@ -18,8 +14,6 @@ public interface ActorPrototype {
* without declared parameters. */
boolean isDirectlyInitializable();
Map<String, ActorState> declaredStates();
/** Get or create a list of ActorStateInstances */
List<ActorStateInstance> asStateList();
}

View File

@ -0,0 +1,132 @@
package nu.marginalia.actor.prototype;
import com.google.gson.Gson;
import nu.marginalia.actor.state.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
public abstract class RecordActorPrototype implements ActorPrototype {
private final Gson gson;
private static final Logger logger = LoggerFactory.getLogger(ActorPrototype.class);
public RecordActorPrototype(Gson gson) {
this.gson = gson;
}
@Terminal
public record End() implements ActorStep {}
@Terminal
public record Error(String message) implements ActorStep {
public Error() { this(""); }
}
/** Implements the actor graph transitions.
* The return value of this method will be persisted into the database message queue
* and loaded back again before execution.
* */
public abstract ActorStep transition(ActorStep self) throws Exception;
@Override
public abstract String describe();
@Override
public boolean isDirectlyInitializable() {
// An actor is Directly Initializable if it has a state called Initial with a zero-argument constructor
for (Class<?> clazz = getClass();
RecordActorPrototype.class.isAssignableFrom(clazz);
clazz = clazz.getSuperclass()) {
if (Arrays.stream(clazz.getDeclaredClasses())
.filter(declaredClazz -> declaredClazz.getSimpleName().equals("Initial"))
.flatMap(declaredClazz -> Arrays.stream(declaredClazz.getDeclaredConstructors()))
.anyMatch(con -> con.getParameterCount() == 0)) {
return true;
}
}
return false;
}
@Override
public List<ActorStateInstance> asStateList() {
List<ActorStateInstance> steps = new ArrayList<>();
// Look for member classes that instantiate ActorStep in this class and all parent classes up until
// RecordActorPrototype
for (Class<?> clazz = getClass();
RecordActorPrototype.class.isAssignableFrom(clazz);
clazz = clazz.getSuperclass())
{
for (var stepClass : clazz.getDeclaredClasses()) {
if (!ActorStep.class.isAssignableFrom(stepClass))
continue;
steps.add(new StepInstance((Class<? extends ActorStep>) stepClass));
}
}
return steps;
}
private class StepInstance implements ActorStateInstance {
private final Class<? extends ActorStep> stepClass;
public StepInstance(Class<? extends ActorStep> stepClass) {
this.stepClass = stepClass;
}
@Override
public String name() {
return stepClass.getSimpleName().toUpperCase();
}
@Override
public ActorStateTransition next(String message) {
try {
ActorStep dest;
if (null == message || message.isBlank()) {
dest = stepClass.getDeclaredConstructor().newInstance();
} else {
dest = gson.fromJson(message, stepClass);
}
dest = transition(dest);
return new ActorStateTransition(
dest.getClass().getSimpleName().toUpperCase(),
gson.toJson(dest)
);
} catch (ActorControlFlowException cfe) {
return new ActorStateTransition(
Error.class.getSimpleName(),
gson.toJson(new Error(cfe.getMessage()))
);
} catch (Exception ex) {
logger.error("Error in transition handler, decoding {}:'{}'", stepClass.getSimpleName(), message);
logger.error("Exception was", ex);
return new ActorStateTransition("ERROR", ex.getMessage());
}
}
@Override
public ActorResumeBehavior resumeBehavior() {
var behavior = stepClass.getAnnotation(Resume.class);
if (null == behavior)
return ActorResumeBehavior.ERROR;
return behavior.behavior();
}
@Override
public boolean isFinal() {
return stepClass.getAnnotation(Terminal.class) != null;
}
}
}

View File

@ -0,0 +1,10 @@
package nu.marginalia.actor.state;
/** Throwing this exception within RecordActorPrototype's transition method is equivalent to
* yielding new Error(message).
*/
public class ActorControlFlowException extends Exception {
public ActorControlFlowException(String message) {
super(message);
}
}

View File

@ -1,15 +0,0 @@
package nu.marginalia.actor.state;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
/** Annotation for declaring a state in an actor's state graph. */
@Retention(RetentionPolicy.RUNTIME)
public @interface ActorState {
String name();
String next() default "ERROR";
String[] transitions() default {};
String description() default "";
ActorResumeBehavior resume() default ActorResumeBehavior.ERROR;
}

View File

@ -0,0 +1,3 @@
package nu.marginalia.actor.state;
public interface ActorStep {}

View File

@ -1,10 +1,10 @@
package nu.marginalia.actor.state;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME)
public @interface ActorTerminalState {
String name();
String description() default "";
public @interface Resume {
ActorResumeBehavior behavior();
}

View File

@ -0,0 +1,8 @@
package nu.marginalia.actor.state;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
@Retention(RetentionPolicy.RUNTIME)
public @interface Terminal {
}

View File

@ -1,13 +1,16 @@
package nu.marginalia.actor;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqTestUtil;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.actor.state.ActorState;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.parallel.Execution;
import org.testcontainers.containers.MariaDBContainer;
@ -17,13 +20,12 @@ import org.testcontainers.junit.jupiter.Testcontainers;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
@Tag("slow")
@Testcontainers
@Execution(SAME_THREAD)
public class ActorStateMachineTest {
public class ActorRecordMachineTest {
@Container
static MariaDBContainer<?> mariaDBContainer = new MariaDBContainer<>("mariadb")
.withDatabaseName("WMSA_prod")
@ -58,39 +60,47 @@ public class ActorStateMachineTest {
dataSource.close();
}
public static class TestPrototypeActor extends AbstractActorPrototype {
public TestPrototypeActor(ActorStateFactory stateFactory) {
super(stateFactory);
public static class TestPrototypeActor extends RecordActorPrototype {
public TestPrototypeActor(Gson gson)
{
super(gson);
}
public record Initial() implements ActorStep {}
public record Greet(String message) implements ActorStep {}
public record CountDown(int from) implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) {
return switch (self) {
case Initial i -> new Greet("World");
case Greet(String name) -> {
System.out.println("Hello " + name + "!");
yield new CountDown(5);
}
case CountDown (int from) -> {
if (from > 0) {
System.out.println(from);
yield new CountDown(from - 1);
}
yield new End();
}
default -> new Error();
};
}
public String describe() {
return "Test graph";
}
@ActorState(name = "INITIAL", next = "GREET")
public String initial() {
return "World";
}
@ActorState(name = "GREET")
public void greet(String message) {
System.out.println("Hello, " + message + "!");
transition("COUNT-DOWN", 5);
}
@ActorState(name = "COUNT-DOWN", next = "END")
public void countDown(Integer from) {
if (from > 0) {
System.out.println(from);
transition("COUNT-DOWN", from - 1);
}
}
}
@Test
public void testAnnotatedStateGraph() throws Exception {
var stateFactory = new ActorStateFactory(new GsonBuilder().create());
var graph = new TestPrototypeActor(stateFactory);
var graph = new TestPrototypeActor(new GsonBuilder().create());
graph.asStateList().forEach(i -> {
System.out.println(i.name());
});
var sm = new ActorStateMachine(messageQueueFactory, inboxId, 0, UUID.randomUUID(), graph);
@ -107,8 +117,7 @@ public class ActorStateMachineTest {
@Test
public void testStartStopStartStop() throws Exception {
var stateFactory = new ActorStateFactory(new GsonBuilder().create());
var sm = new ActorStateMachine(messageQueueFactory, inboxId, 0, UUID.randomUUID(), new TestPrototypeActor(stateFactory));
var sm = new ActorStateMachine(messageQueueFactory, inboxId, 0, UUID.randomUUID(), new TestPrototypeActor(new GsonBuilder().create()));
sm.init();
@ -117,7 +126,7 @@ public class ActorStateMachineTest {
System.out.println("-------------------- ");
var sm2 = new ActorStateMachine(messageQueueFactory, inboxId, 0, UUID.randomUUID(), new TestPrototypeActor(stateFactory));
var sm2 = new ActorStateMachine(messageQueueFactory, inboxId, 0, UUID.randomUUID(), new TestPrototypeActor(new GsonBuilder().create()));
sm2.join(2, TimeUnit.SECONDS);
sm2.stop();
@ -126,15 +135,13 @@ public class ActorStateMachineTest {
@Test
public void testFalseTransition() throws Exception {
var stateFactory = new ActorStateFactory(new GsonBuilder().create());
// Prep the queue with a message to set the state to initial,
// and an additional message to trigger the false transition back to initial
persistence.sendNewMessage(inboxId, null, null, "INITIAL", "", null);
persistence.sendNewMessage(inboxId, null, null, "INITIAL", "", null);
var sm = new ActorStateMachine(messageQueueFactory, inboxId, 0, UUID.randomUUID(), new TestPrototypeActor(stateFactory));
var sm = new ActorStateMachine(messageQueueFactory, inboxId, 0, UUID.randomUUID(), new TestPrototypeActor(new GsonBuilder().create()));
Thread.sleep(50);

View File

@ -1,107 +0,0 @@
package nu.marginalia.actor;
import com.google.gson.GsonBuilder;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessageRow;
import nu.marginalia.mq.MqTestUtil;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorResumeBehavior;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.parallel.Execution;
import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
@Tag("slow")
@Testcontainers
@Execution(SAME_THREAD)
public class ActorStateMachineErrorTest {
@Container
static MariaDBContainer<?> mariaDBContainer = new MariaDBContainer<>("mariadb")
.withDatabaseName("WMSA_prod")
.withUsername("wmsa")
.withPassword("wmsa")
.withInitScript("db/migration/V23_07_0_003__message_queue.sql")
.withNetworkAliases("mariadb");
static HikariDataSource dataSource;
static MqPersistence persistence;
static MessageQueueFactory messageQueueFactory;
private String inboxId;
@BeforeEach
public void setUp() {
inboxId = UUID.randomUUID().toString();
}
@BeforeAll
public static void setUpAll() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(mariaDBContainer.getJdbcUrl());
config.setUsername("wmsa");
config.setPassword("wmsa");
dataSource = new HikariDataSource(config);
persistence = new MqPersistence(dataSource);
messageQueueFactory = new MessageQueueFactory(persistence);
}
@AfterAll
public static void tearDownAll() {
dataSource.close();
}
public static class ErrorHurdles extends AbstractActorPrototype {
public ErrorHurdles(ActorStateFactory stateFactory) {
super(stateFactory);
}
public String describe() {
return "Test graph";
}
@ActorState(name = "INITIAL", next = "FAILING")
public void initial() {
}
@ActorState(name = "FAILING", next = "OK", resume = ActorResumeBehavior.RETRY)
public void resumable() {
throw new RuntimeException("Boom!");
}
@ActorState(name = "OK", next = "END")
public void ok() {
}
}
@Test
public void smResumeResumableFromNew() throws Exception {
var stateFactory = new ActorStateFactory(new GsonBuilder().create());
var sm = new ActorStateMachine(messageQueueFactory, inboxId, 0, UUID.randomUUID(), new ErrorHurdles(stateFactory));
sm.init();
sm.join(2, TimeUnit.SECONDS);
sm.stop();
List<String> states = MqTestUtil.getMessages(dataSource, inboxId, 0)
.stream()
.peek(System.out::println)
.map(MqMessageRow::function)
.toList();
assertEquals(List.of("INITIAL", "FAILING", "ERROR"), states);
}
}

View File

@ -1,101 +0,0 @@
package nu.marginalia.actor;
import com.google.gson.GsonBuilder;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqTestUtil;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.actor.state.ActorState;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.parallel.Execution;
import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
@Tag("slow")
@Testcontainers
@Execution(SAME_THREAD)
public class ActorStateMachineNullTest {
@Container
static MariaDBContainer<?> mariaDBContainer = new MariaDBContainer<>("mariadb")
.withDatabaseName("WMSA_prod")
.withUsername("wmsa")
.withPassword("wmsa")
.withInitScript("db/migration/V23_07_0_003__message_queue.sql")
.withNetworkAliases("mariadb");
static HikariDataSource dataSource;
static MqPersistence persistence;
static MessageQueueFactory messageQueueFactory;
private String inboxId;
@BeforeEach
public void setUp() {
inboxId = UUID.randomUUID().toString();
}
@BeforeAll
public static void setUpAll() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(mariaDBContainer.getJdbcUrl());
config.setUsername("wmsa");
config.setPassword("wmsa");
dataSource = new HikariDataSource(config);
persistence = new MqPersistence(dataSource);
messageQueueFactory = new MessageQueueFactory(persistence);
}
@AfterAll
public static void tearDownAll() {
dataSource.close();
}
public static class TestPrototypeActor extends AbstractActorPrototype {
public TestPrototypeActor(ActorStateFactory stateFactory) {
super(stateFactory);
}
public String describe() {
return "Test graph";
}
@ActorState(name = "INITIAL", next = "GREET")
public void initial() {}
@ActorState(name = "GREET", next = "END")
public void greet(String message) {
if (null == message) {
System.out.println("Hello, null!");
return;
}
Assertions.fail("Should not be called");
}
}
@Test
public void testStateGraphNullSerialization() throws Exception {
var stateFactory = new ActorStateFactory(new GsonBuilder().create());
var graph = new TestPrototypeActor(stateFactory);
var sm = new ActorStateMachine(messageQueueFactory, inboxId, 0, UUID.randomUUID(), graph);
sm.registerStates(graph);
sm.init();
sm.join(2, TimeUnit.SECONDS);
sm.stop();
MqTestUtil.getMessages(dataSource, inboxId, 0).forEach(System.out::println);
}
}

View File

@ -1,191 +0,0 @@
package nu.marginalia.actor;
import com.google.gson.GsonBuilder;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessageRow;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.MqTestUtil;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorResumeBehavior;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.parallel.Execution;
import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
@Tag("slow")
@Testcontainers
@Execution(SAME_THREAD)
public class ActorStateMachineResumeTest {
@Container
static MariaDBContainer<?> mariaDBContainer = new MariaDBContainer<>("mariadb")
.withDatabaseName("WMSA_prod")
.withUsername("wmsa")
.withPassword("wmsa")
.withInitScript("db/migration/V23_07_0_003__message_queue.sql")
.withNetworkAliases("mariadb");
static HikariDataSource dataSource;
static MqPersistence persistence;
static MessageQueueFactory messageQueueFactory;
private String inboxId;
@BeforeEach
public void setUp() {
inboxId = UUID.randomUUID().toString();
}
@BeforeAll
public static void setUpAll() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(mariaDBContainer.getJdbcUrl());
config.setUsername("wmsa");
config.setPassword("wmsa");
dataSource = new HikariDataSource(config);
persistence = new MqPersistence(dataSource);
messageQueueFactory = new MessageQueueFactory(persistence);
}
@AfterAll
public static void tearDownAll() {
dataSource.close();
}
public static class ResumeTrialsPrototypeActor extends AbstractActorPrototype {
public ResumeTrialsPrototypeActor(ActorStateFactory stateFactory) {
super(stateFactory);
}
public String describe() {
return "Test graph";
}
@ActorState(name = "INITIAL", next = "RESUMABLE")
public void initial() {}
@ActorState(name = "RESUMABLE", next = "NON-RESUMABLE", resume = ActorResumeBehavior.RETRY)
public void resumable() {}
@ActorState(name = "NON-RESUMABLE", next = "OK", resume = ActorResumeBehavior.ERROR)
public void nonResumable() {}
@ActorState(name = "OK", next = "END")
public void ok() {}
}
@Test
public void smResumeResumableFromNew() throws Exception {
var stateFactory = new ActorStateFactory(new GsonBuilder().create());
sendMessage(inboxId, 0, "RESUMABLE");
var sm = new ActorStateMachine(messageQueueFactory, inboxId, 0, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory));
sm.join(2, TimeUnit.SECONDS);
sm.stop();
List<String> states = MqTestUtil.getMessages(dataSource, inboxId, 0)
.stream()
.peek(System.out::println)
.map(MqMessageRow::function)
.toList();
assertEquals(List.of("RESUMABLE", "NON-RESUMABLE", "OK", "END"), states);
}
private long sendMessage(String inboxId, int node, String function) throws Exception {
return persistence.sendNewMessage(inboxId+":"+node, null, -1L, function, "", null);
}
@Test
public void smResumeFromAck() throws Exception {
var stateFactory = new ActorStateFactory(new GsonBuilder().create());
long id = sendMessage(inboxId, 0, "RESUMABLE");
persistence.updateMessageState(id, MqMessageState.ACK);
var sm = new ActorStateMachine(messageQueueFactory, inboxId, 0, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory));
sm.join(4, TimeUnit.SECONDS);
sm.stop();
List<String> states = MqTestUtil.getMessages(dataSource, inboxId, 0)
.stream()
.peek(System.out::println)
.map(MqMessageRow::function)
.toList();
assertEquals(List.of("RESUMABLE", "RESUMABLE", "NON-RESUMABLE", "OK", "END"), states);
}
@Test
public void smResumeNonResumableFromNew() throws Exception {
var stateFactory = new ActorStateFactory(new GsonBuilder().create());
sendMessage(inboxId, 0, "NON-RESUMABLE");
var sm = new ActorStateMachine(messageQueueFactory, inboxId, 0, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory));
sm.join(2, TimeUnit.SECONDS);
sm.stop();
List<String> states = MqTestUtil.getMessages(dataSource, inboxId, 0)
.stream()
.peek(System.out::println)
.map(MqMessageRow::function)
.toList();
assertEquals(List.of("NON-RESUMABLE", "OK", "END"), states);
}
@Test
public void smResumeNonResumableFromAck() throws Exception {
var stateFactory = new ActorStateFactory(new GsonBuilder().create());
long id = sendMessage(inboxId, 0, "NON-RESUMABLE");
persistence.updateMessageState(id, MqMessageState.ACK);
var sm = new ActorStateMachine(messageQueueFactory, inboxId, 0, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory));
sm.join(2, TimeUnit.SECONDS);
sm.stop();
List<String> states = MqTestUtil.getMessages(dataSource, inboxId, 0)
.stream()
.peek(System.out::println)
.map(MqMessageRow::function)
.toList();
assertEquals(List.of("NON-RESUMABLE", "ERROR"), states);
}
@Test
public void smResumeEmptyQueue() throws Exception {
var stateFactory = new ActorStateFactory(new GsonBuilder().create());
var sm = new ActorStateMachine(messageQueueFactory, inboxId, 0, UUID.randomUUID(), new ResumeTrialsPrototypeActor(stateFactory));
sm.join(2, TimeUnit.SECONDS);
sm.stop();
List<String> states = MqTestUtil.getMessages(dataSource, inboxId, 0)
.stream()
.peek(System.out::println)
.map(MqMessageRow::function)
.toList();
assertEquals(List.of(), states);
}
}

View File

@ -5,7 +5,7 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.SneakyThrows;
import nu.marginalia.actor.ActorStateMachine;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.prototype.ActorPrototype;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.control.actor.rebalance.RebalanceActor;
import nu.marginalia.model.gson.GsonFactory;
@ -25,7 +25,7 @@ public class ControlActorService {
private final Gson gson;
private final MessageQueueFactory messageQueueFactory;
public Map<ControlActor, ActorStateMachine> stateMachines = new HashMap<>();
public Map<ControlActor, AbstractActorPrototype> actorDefinitions = new HashMap<>();
public Map<ControlActor, ActorPrototype> actorDefinitions = new HashMap<>();
private final int node;
@Inject
public ControlActorService(MessageQueueFactory messageQueueFactory,
@ -37,10 +37,10 @@ public class ControlActorService {
this.gson = GsonFactory.get();
this.node = baseServiceParams.configuration.node();
register(ControlActor.REBALANCE, rebalanceActor);
// register(ControlActor.REBALANCE, rebalanceActor);
}
private void register(ControlActor process, AbstractActorPrototype graph) {
private void register(ControlActor process, ActorPrototype graph) {
var sm = new ActorStateMachine(messageQueueFactory, process.id(), node, UUID.randomUUID(), graph);
sm.listen((function, param) -> logStateChange(process, function));
@ -104,7 +104,7 @@ public class ControlActorService {
return actorDefinitions.get(actor).isDirectlyInitializable();
}
public AbstractActorPrototype getActorDefinition(ControlActor actor) {
public ActorPrototype getActorDefinition(ControlActor actor) {
return actorDefinitions.get(actor);
}

View File

@ -1,26 +1,7 @@
package nu.marginalia.control.actor.rebalance;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeConfiguration;
import org.jetbrains.annotations.NotNull;
import java.sql.SQLException;
import java.util.*;
import com.google.gson.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RebalanceActor extends AbstractActorPrototype {
public class RebalanceActor {
/**
// States
public static final String INIT = "INIT";
@ -149,7 +130,7 @@ public class RebalanceActor extends AbstractActorPrototype {
//-- append to receiver crawler log
//-- instruct donor to delete file
//
//4. regenerate crawler logs based on present files on all donor nodes */
//4. regenerate crawler logs based on present files on all donor nodes * /
public record Sur(int n, int c) implements Comparable<Sur> {
@Override
@ -180,5 +161,5 @@ public class RebalanceActor extends AbstractActorPrototype {
}
public record Give(int donor, int dest, int c) {
}
} */
}

View File

@ -1,78 +0,0 @@
package nu.marginalia.control.actor.rebalance;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
import static nu.marginalia.control.actor.rebalance.RebalanceActor.*;
import static org.junit.Assert.assertEquals;
class RebalanceActorTest {
RebalanceActor actor = new RebalanceActor(null, null, null, null);
@Test
void calculateTransactions1_2() {
var transactions = actor.calculateTransactions(
List.of(new Pop(1, 100), new Pop(2, 0))
);
var expected = List.of(new Give(1, 2, 50));
Assertions.assertEquals(expected, transactions);
}
@Test
void calculateTransactions1_3() {
var transactions = actor.calculateTransactions(
List.of(
new Pop(1, 90),
new Pop(2, 0),
new Pop(3, 0)
)
);
var expected = List.of(
new Give(1, 2, 30),
new Give(1, 3, 30)
);
Assertions.assertEquals(expected, transactions);
}
@Test
void calculateTransactions2_3() {
var transactions = actor.calculateTransactions(
List.of(
new Pop(1, 30),
new Pop(2, 30),
new Pop(3, 0)
)
);
var expected = List.of(
new Give(1, 3, 10),
new Give(2, 3, 10)
);
Assertions.assertEquals(expected, transactions);
}
@Test
void calculateTransactionsEmpty() {
try {
actor.calculateTransactions(List.of());
Assertions.fail("Expected transition");
}
catch (AbstractActorPrototype.ControlFlowException ex) {
Assertions.assertEquals("END", ex.getState());
}
try {
actor.calculateTransactions(List.of(new Pop(1, 100)));
Assertions.fail("Expected transition");
}
catch (AbstractActorPrototype.ControlFlowException ex) {
Assertions.assertEquals("END", ex.getState());
}
}
}

View File

@ -29,7 +29,7 @@ public class ActorApi {
public Object startActor(Request request, Response response) throws Exception {
ExecutorActor actor = translateActor(request.params("id"));
actors.startJSON(actor, request.body());
actors.start(actor);
return "";
}

View File

@ -13,10 +13,8 @@ public enum ExecutorActor {
ADJACENCY_CALCULATION,
CRAWL_JOB_EXTRACTOR,
EXPORT_DATA,
TRUNCATE_LINK_DATABASE,
PROC_INDEX_CONSTRUCTOR_SPAWNER,
CONVERT,
TRANSFER_DOMAINS,
RESTORE_BACKUP;
public String id() {

View File

@ -6,8 +6,10 @@ import com.google.inject.Singleton;
import lombok.SneakyThrows;
import nu.marginalia.actor.monitor.*;
import nu.marginalia.actor.proc.*;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.prototype.ActorPrototype;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.task.*;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MessageQueueFactory;
@ -26,7 +28,7 @@ public class ExecutorActorControlService {
private final Gson gson;
private final MessageQueueFactory messageQueueFactory;
public Map<ExecutorActor, ActorStateMachine> stateMachines = new HashMap<>();
public Map<ExecutorActor, AbstractActorPrototype> actorDefinitions = new HashMap<>();
public Map<ExecutorActor, ActorPrototype> actorDefinitions = new HashMap<>();
private final int node;
@Inject
public ExecutorActorControlService(MessageQueueFactory messageQueueFactory,
@ -45,9 +47,7 @@ public class ExecutorActorControlService {
IndexConstructorMonitorActor indexConstructorMonitorActor,
TriggerAdjacencyCalculationActor triggerAdjacencyCalculationActor,
CrawlJobExtractorActor crawlJobExtractorActor,
ExportDataActor exportDataActor,
TruncateLinkDatabase truncateLinkDatabase,
TransferDomainsActor transferDomainsActor
ExportDataActor exportDataActor
) {
this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog;
@ -72,55 +72,68 @@ public class ExecutorActorControlService {
register(ExecutorActor.ADJACENCY_CALCULATION, triggerAdjacencyCalculationActor);
register(ExecutorActor.CRAWL_JOB_EXTRACTOR, crawlJobExtractorActor);
register(ExecutorActor.EXPORT_DATA, exportDataActor);
register(ExecutorActor.TRUNCATE_LINK_DATABASE, truncateLinkDatabase);
register(ExecutorActor.TRANSFER_DOMAINS, transferDomainsActor);
}
private void register(ExecutorActor process, AbstractActorPrototype graph) {
private void register(ExecutorActor process, ActorPrototype graph) {
var sm = new ActorStateMachine(messageQueueFactory, process.id(), node, UUID.randomUUID(), graph);
sm.listen((function, param) -> logStateChange(process, function));
stateMachines.put(process, sm);
actorDefinitions.put(process, graph);
}
private void register(ExecutorActor process, RecordActorPrototype graph) {
var sm = new ActorStateMachine(messageQueueFactory, process.id(), node, UUID.randomUUID(), graph);
sm.listen((function, param) -> logStateChange(process, function));
stateMachines.put(process, sm);
actorDefinitions.put(process, graph);
}
private void logStateChange(ExecutorActor process, String state) {
eventLog.logEvent("FSM-STATE-CHANGE", process.id() + " -> " + state);
}
public void startFrom(ExecutorActor process, String state) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state);
}
public void start(ExecutorActor process) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init();
}
public <T> void startFrom(ExecutorActor process, String state, Object arg) throws Exception {
public <T> void startFrom(ExecutorActor process, ActorStep step) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state, gson.toJson(arg));
stateMachines.get(process).initFrom(
step.getClass().getSimpleName().toUpperCase(),
gson.toJson(step)
);
}
public <T> void startFromJSON(ExecutorActor process, String state, String json) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).initFrom(state, json);
if (json.isBlank()) {
stateMachines.get(process).initFrom(state);
}
else {
stateMachines.get(process).initFrom(state, json);
}
}
@Deprecated
public <T> void start(ExecutorActor process, Object arg) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init(gson.toJson(arg));
}
@Deprecated
public <T> void startJSON(ExecutorActor process, String json) throws Exception {
eventLog.logEvent("FSM-START", process.id());
stateMachines.get(process).init(json);
if (json.isBlank()) {
stateMachines.get(process).init();
}
else {
stateMachines.get(process).init(json);
}
}
@SneakyThrows
public void stop(ExecutorActor process) {
@ -140,7 +153,7 @@ public class ExecutorActorControlService {
return actorDefinitions.get(actor).isDirectlyInitializable();
}
public AbstractActorPrototype getActorDefinition(ExecutorActor actor) {
public ActorPrototype getActorDefinition(ExecutorActor actor) {
return actorDefinitions.get(actor);
}

View File

@ -1,12 +1,10 @@
package nu.marginalia.actor.monitor;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorTerminalState;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.*;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.persistence.MqPersistence;
@ -22,38 +20,97 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@Singleton
public class AbstractProcessSpawnerActor extends AbstractActorPrototype {
public class AbstractProcessSpawnerActor extends RecordActorPrototype {
private final MqPersistence persistence;
private final ProcessService processService;
private final Logger logger = LoggerFactory.getLogger(getClass());
public static final String INITIAL = "INITIAL";
public static final String MONITOR = "MONITOR";
public static final String RUN = "RUN";
public static final String ERROR = "ERROR";
public static final String ABORTED = "ABORTED";
public static final String END = "END";
public static final int MAX_ATTEMPTS = 3;
private final String inboxName;
private final ProcessService.ProcessId processId;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private final int node;
public record Initial() implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Monitor(int errorAttempts) implements ActorStep {}
public record Run(int attempts) implements ActorStep {}
@Terminal
public record Aborted() implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial i -> new Monitor(0);
case Monitor (int errorAttempts) -> {
for (;;) {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) {
synchronized (processId) {
processId.wait(5000);
}
if (errorAttempts > 0) { // Reset the error counter if there is silence in the inbox
yield new Monitor(0);
}
// else continue
} else {
yield new Run(0);
}
}
}
case Run (int attempts) -> {
try {
long startTime = System.currentTimeMillis();
var exec = new TaskExecution();
long endTime = System.currentTimeMillis();
if (exec.isError()) {
if (attempts < MAX_ATTEMPTS)
yield new Run(attempts + 1);
else
yield new Error();
}
else if (endTime - startTime < TimeUnit.SECONDS.toMillis(1)) {
// To avoid boot loops, we transition to error if the process
// didn't run for longer than 1 seconds. This might happen if
// the process crashes before it can reach the heartbeat and inbox
// stages of execution. In this case it would not report having acted
// on its message, and the process would be restarted forever without
// the attempts counter incrementing.
yield new Error("Process terminated within 1 seconds of starting");
}
}
catch (InterruptedException ex) {
// We get this exception when the process is cancelled by the user
processService.kill(processId);
setCurrentMessageToDead();
yield new Aborted();
}
yield new Monitor(attempts);
}
default -> new Error();
};
}
public String describe() {
return "Spawns a(n) " + processId + " process and monitors its inbox for messages";
}
@Inject
public AbstractProcessSpawnerActor(ActorStateFactory stateFactory,
public AbstractProcessSpawnerActor(Gson gson,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService,
String inboxName,
ProcessService.ProcessId processId) {
super(stateFactory);
super(gson);
this.node = configuration.node();
this.persistence = persistence;
this.processService = processService;
@ -61,90 +118,6 @@ public class AbstractProcessSpawnerActor extends AbstractActorPrototype {
this.processId = processId;
}
@ActorState(name = INITIAL, next = MONITOR)
public void init() {
}
@ActorState(name = MONITOR,
next = MONITOR,
resume = ActorResumeBehavior.RETRY,
transitions = {MONITOR, RUN},
description = """
Monitors the inbox of the process for messages.
If a message is found, transition to RUN.
The state takes an optional Integer parameter errorAttempts
that is passed to run. errorAttempts is set to zero after
a few seconds of silence.
"""
)
public void monitor(Integer errorAttempts) throws SQLException, InterruptedException {
if (errorAttempts == null) {
errorAttempts = 0;
}
for (;;) {
var messages = persistence.eavesdrop(inboxName, 1);
if (messages.isEmpty() && !processService.isRunning(processId)) {
TimeUnit.SECONDS.sleep(5);
if (errorAttempts > 0) { // Reset the error counter if there is silence in the inbox
transition(MONITOR, 0);
}
// else continue
} else {
transition(RUN, errorAttempts);
}
}
}
@ActorState(name = RUN,
resume = ActorResumeBehavior.RESTART,
transitions = {MONITOR, ERROR, RUN, ABORTED},
description = """
Runs the process.
If the process fails, retransition to RUN up to MAX_ATTEMPTS times.
After MAX_ATTEMPTS at restarting the process, transition to ERROR.
If the process is cancelled, transition to ABORTED.
If the process is successful, transition to MONITOR(errorAttempts).
"""
)
public void run(Integer attempts) throws Exception {
if (attempts == null)
attempts = 0;
try {
long startTime = System.currentTimeMillis();
var exec = new TaskExecution();
long endTime = System.currentTimeMillis();
if (exec.isError()) {
if (attempts < MAX_ATTEMPTS) transition(RUN, attempts + 1);
else error();
}
else if (endTime - startTime < TimeUnit.SECONDS.toMillis(1)) {
// To avoid boot loops, we transition to error if the process
// didn't run for longer than 1 seconds. This might happen if
// the process crashes before it can reach the heartbeat and inbox
// stages of execution. In this case it would not report having acted
// on its message, and the process would be restarted forever without
// the attempts counter incrementing.
error("Process terminated within 1 seconds of starting");
}
}
catch (InterruptedException ex) {
// We get this exception when the process is cancelled by the user
processService.kill(processId);
setCurrentMessageToDead();
transition(ABORTED);
}
transition(MONITOR, attempts);
}
/** Sets the message to dead in the database to avoid
* the service respawning on the same task when we
* re-enable this actor */
@ -163,10 +136,6 @@ public class AbstractProcessSpawnerActor extends AbstractActorPrototype {
}
}
@ActorTerminalState(name = ABORTED, description = "The process was manually aborted")
public void aborted() throws Exception {}
/** Encapsulates the execution of the process in a separate thread so that
* we can interrupt the thread if the process is cancelled */
private class TaskExecution {

View File

@ -1,12 +1,13 @@
package nu.marginalia.actor.monitor;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorage;
@ -24,21 +25,65 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
@Singleton
public class FileStorageMonitorActor extends AbstractActorPrototype {
public class FileStorageMonitorActor extends RecordActorPrototype {
private final Logger logger = LoggerFactory.getLogger(getClass());
// STATES
private static final String INITIAL = "INITIAL";
private static final String MONITOR = "MONITOR";
private static final String PURGE = "PURGE";
private static final String REMOVE_STALE = "REMOVE-STALE";
private static final String END = "END";
private final HikariDataSource dataSource;
private final FileStorageService fileStorageService;
private final int node;
public record Initial() implements ActorStep {}
@Resume(behavior=ActorResumeBehavior.RETRY)
public record Monitor() implements ActorStep {}
@Resume(behavior=ActorResumeBehavior.RESTART)
public record Purge(FileStorageId id) implements ActorStep {}
@Resume(behavior=ActorResumeBehavior.RESTART)
public record RemoveStale(FileStorageId id) implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial i -> new Monitor();
case Purge (FileStorageId id) -> {
var storage = fileStorageService.getStorage(id);
logger.info("Deleting {} ", storage.path());
Path path = storage.asPath();
if (Files.exists(path)) {
FileUtils.deleteDirectory(path.toFile());
}
fileStorageService.deregisterFileStorage(storage.id());
yield new Monitor();
}
case RemoveStale(FileStorageId id) -> {
fileStorageService.deregisterFileStorage(id);
yield new Monitor();
}
case Monitor m -> {
for (;;) {
Optional<FileStorage> toDeleteOpt = findFileStorageToDelete();
if (toDeleteOpt.isPresent()) {
yield new Purge(toDeleteOpt.get().id());
}
List<FileStorage> allStorageItems = fileStorageService.getEachFileStorage();
var missing = allStorageItems.stream().filter(storage -> !Files.exists(storage.asPath())).findAny();
if (missing.isPresent()) {
yield new RemoveStale(missing.get().id());
}
fileStorageService.synchronizeStorageManifests(fileStorageService.getStorageBase(FileStorageBaseType.STORAGE));
fileStorageService.synchronizeStorageManifests(fileStorageService.getStorageBase(FileStorageBaseType.BACKUP));
TimeUnit.SECONDS.sleep(10);
}
}
default -> new Error();
};
}
@Override
public String describe() {
return "Monitor the file storage directories and purge any file storage area that has been marked for deletion," +
@ -46,82 +91,16 @@ public class FileStorageMonitorActor extends AbstractActorPrototype {
}
@Inject
public FileStorageMonitorActor(ActorStateFactory stateFactory,
public FileStorageMonitorActor(Gson gson,
HikariDataSource dataSource,
ServiceConfiguration serviceConfiguration,
FileStorageService fileStorageService) {
super(stateFactory);
super(gson);
this.dataSource = dataSource;
this.fileStorageService = fileStorageService;
this.node = serviceConfiguration.node();
}
@ActorState(name = INITIAL, next = MONITOR)
public void init() {
}
@ActorState(name = MONITOR,
next = PURGE,
resume = ActorResumeBehavior.RETRY,
transitions = { PURGE, REMOVE_STALE },
description = """
Monitor the file storage and trigger at transition to PURGE if any file storage area
has been marked for deletion.
""")
public void monitor() throws Exception {
for (;;) {
Optional<FileStorage> toDeleteOpt = findFileStorageToDelete();
if (toDeleteOpt.isPresent()) {
transition(PURGE, toDeleteOpt.get().id());
}
List<FileStorage> allStorageItems = fileStorageService.getEachFileStorage();
var missing = allStorageItems.stream().filter(storage -> !Files.exists(storage.asPath())).findAny();
if (missing.isPresent()) {
transition(REMOVE_STALE, missing.get().id());
}
fileStorageService.synchronizeStorageManifests(fileStorageService.getStorageBase(FileStorageBaseType.STORAGE));
fileStorageService.synchronizeStorageManifests(fileStorageService.getStorageBase(FileStorageBaseType.BACKUP));
TimeUnit.SECONDS.sleep(10);
}
}
@ActorState(name = PURGE,
next = MONITOR,
resume = ActorResumeBehavior.RETRY,
description = """
Purge the file storage area and transition back to MONITOR.
"""
)
public void purge(FileStorageId id) throws Exception {
var storage = fileStorageService.getStorage(id);
logger.info("Deleting {} ", storage.path());
Path path = storage.asPath();
if (Files.exists(path)) {
FileUtils.deleteDirectory(path.toFile());
}
fileStorageService.deregisterFileStorage(storage.id());
}
@ActorState(
name = REMOVE_STALE,
next = MONITOR,
resume = ActorResumeBehavior.RETRY,
description = """
Remove file storage from the database if it doesn't exist on disk.
"""
)
public void removeStale(FileStorageId id) throws SQLException {
fileStorageService.deregisterFileStorage(id);
}
public Optional<FileStorage> findFileStorageToDelete() {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""

View File

@ -1,52 +1,53 @@
package nu.marginalia.actor.monitor;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.mq.persistence.MqPersistence;
import java.util.concurrent.TimeUnit;
@Singleton
public class MessageQueueMonitorActor extends AbstractActorPrototype {
public class MessageQueueMonitorActor extends RecordActorPrototype {
// STATES
private static final String INITIAL = "INITIAL";
private static final String MONITOR = "MONITOR";
private static final String END = "END";
private final MqPersistence persistence;
public record Initial() implements ActorStep {}
@Resume(behavior=ActorResumeBehavior.RETRY)
public record Monitor() implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial i -> new Monitor();
case Monitor m -> {
for (;;) {
persistence.reapDeadMessages();
persistence.cleanOldMessages();
TimeUnit.SECONDS.sleep(60);
}
}
default -> new Error();
};
}
@Inject
public MessageQueueMonitorActor(Gson gson,
MqPersistence persistence) {
super(gson);
this.persistence = persistence;
}
@Override
public String describe() {
return "Periodically run maintenance tasks on the message queue";
}
@Inject
public MessageQueueMonitorActor(ActorStateFactory stateFactory,
MqPersistence persistence) {
super(stateFactory);
this.persistence = persistence;
}
@ActorState(name = INITIAL, next = MONITOR)
public void init() {
}
@ActorState(name = MONITOR, next = MONITOR, resume = ActorResumeBehavior.RETRY,
description = """
Periodically clean up the message queue.
""")
public void monitor() throws Exception {
for (;;) {
persistence.reapDeadMessages();
persistence.cleanOldMessages();
TimeUnit.SECONDS.sleep(60);
}
}
}

View File

@ -1,8 +1,8 @@
package nu.marginalia.actor.proc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence;
@ -14,11 +14,11 @@ public class ConverterMonitorActor extends AbstractProcessSpawnerActor {
@Inject
public ConverterMonitorActor(ActorStateFactory stateFactory,
public ConverterMonitorActor(Gson gson,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService) {
super(stateFactory,
super(gson,
configuration,
persistence,
processService,

View File

@ -1,8 +1,8 @@
package nu.marginalia.actor.proc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence;
@ -13,11 +13,11 @@ import nu.marginalia.service.module.ServiceConfiguration;
public class CrawlerMonitorActor extends AbstractProcessSpawnerActor {
@Inject
public CrawlerMonitorActor(ActorStateFactory stateFactory,
public CrawlerMonitorActor(Gson gson,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService) {
super(stateFactory,
super(gson,
configuration,
persistence,
processService,

View File

@ -1,8 +1,8 @@
package nu.marginalia.actor.proc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence;
@ -14,11 +14,11 @@ public class IndexConstructorMonitorActor extends AbstractProcessSpawnerActor {
@Inject
public IndexConstructorMonitorActor(ActorStateFactory stateFactory,
public IndexConstructorMonitorActor(Gson gson,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService) {
super(stateFactory,
super(gson,
configuration,
persistence,
processService,

View File

@ -1,8 +1,8 @@
package nu.marginalia.actor.proc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.monitor.AbstractProcessSpawnerActor;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.persistence.MqPersistence;
@ -14,12 +14,12 @@ public class LoaderMonitorActor extends AbstractProcessSpawnerActor {
@Inject
public LoaderMonitorActor(ActorStateFactory stateFactory,
public LoaderMonitorActor(Gson gson,
ServiceConfiguration configuration,
MqPersistence persistence,
ProcessService processService) {
super(stateFactory,
super(gson,
configuration,
persistence, processService,
ProcessInboxNames.LOADER_INBOX,

View File

@ -1,14 +1,14 @@
package nu.marginalia.actor.proc;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.process.ProcessService;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.module.ServiceConfiguration;
import java.sql.SQLException;
import java.util.ArrayList;
@ -16,24 +16,55 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
@Singleton
public class ProcessLivenessMonitorActor extends AbstractActorPrototype {
public class ProcessLivenessMonitorActor extends RecordActorPrototype {
// STATES
private static final String INITIAL = "INITIAL";
private static final String MONITOR = "MONITOR";
private static final String END = "END";
private final ServiceEventLog eventLogService;
private final ProcessService processService;
private final HikariDataSource dataSource;
private final int node;
public record Initial() implements ActorStep {}
public record Monitor() implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial() -> new Monitor();
case Monitor() -> {
for (;;) {
for (var heartbeat : getProcessHeartbeats()) {
if (!heartbeat.isRunning()) continue;
var processId = heartbeat.getProcessId();
if (null == processId) continue;
if (processService.isRunning(processId) && heartbeat.lastSeenMillis() < 10_000)
continue;
flagProcessAsStopped(heartbeat);
}
for (var heartbeat : getTaskHeartbeats()) {
if (heartbeat.lastSeenMillis() < 10_000) continue;
removeTaskHeartbeat(heartbeat);
}
TimeUnit.SECONDS.sleep(60);
}
}
default -> new Error();
};
}
@Inject
public ProcessLivenessMonitorActor(ActorStateFactory stateFactory,
public ProcessLivenessMonitorActor(Gson gson,
ServiceEventLog eventLogService,
ServiceConfiguration configuration,
ProcessService processService,
HikariDataSource dataSource) {
super(stateFactory);
super(gson);
this.node = configuration.node();
this.eventLogService = eventLogService;
this.processService = processService;
this.dataSource = dataSource;
@ -44,49 +75,6 @@ public class ProcessLivenessMonitorActor extends AbstractActorPrototype {
return "Periodically check to ensure that the control service's view of running processes is agreement with the process heartbeats table.";
}
@ActorState(name = INITIAL, next = MONITOR)
public void init() {
}
@ActorState(name = MONITOR, next = MONITOR, resume = ActorResumeBehavior.RETRY, description = """
Periodically check to ensure that the control service's view of
running processes is agreement with the process heartbeats table.
If the process is not running, mark the process as stopped in the table.
""")
public void monitor() throws Exception {
for (;;) {
for (var heartbeat : getProcessHeartbeats()) {
if (!heartbeat.isRunning()) {
continue;
}
var processId = heartbeat.getProcessId();
if (null == processId)
continue;
if (processService.isRunning(processId) && heartbeat.lastSeenMillis() < 10_000) {
continue;
}
flagProcessAsStopped(heartbeat);
}
for (var heartbeat : getTaskHeartbeats()) {
if (heartbeat.lastSeenMillis() < 10_000) {
continue;
}
removeTaskHeartbeat(heartbeat);
}
TimeUnit.SECONDS.sleep(60);
}
}
private List<ProcessHeartbeat> getProcessHeartbeats() {
List<ProcessHeartbeat> heartbeats = new ArrayList<>();
@ -95,8 +83,10 @@ public class ProcessLivenessMonitorActor extends AbstractActorPrototype {
SELECT PROCESS_NAME, PROCESS_BASE, INSTANCE, STATUS, PROGRESS,
TIMESTAMPDIFF(MICROSECOND, HEARTBEAT_TIME, CURRENT_TIMESTAMP(6)) AS TSDIFF
FROM PROCESS_HEARTBEAT
WHERE NODE = ?
""")) {
stmt.setInt(1, node);
var rs = stmt.executeQuery();
while (rs.next()) {
int progress = rs.getInt("PROGRESS");
@ -143,7 +133,9 @@ public class ProcessLivenessMonitorActor extends AbstractActorPrototype {
var stmt = conn.prepareStatement("""
SELECT TASK_NAME, TASK_BASE, INSTANCE, SERVICE_INSTANCE, STATUS, STAGE_NAME, PROGRESS, TIMESTAMPDIFF(MICROSECOND, TASK_HEARTBEAT.HEARTBEAT_TIME, CURRENT_TIMESTAMP(6)) AS TSDIFF
FROM TASK_HEARTBEAT
WHERE NODE=?
""")) {
stmt.setInt(1, node);
var rs = stmt.executeQuery();
while (rs.next()) {
int progress = rs.getInt("PROGRESS");

View File

@ -2,7 +2,7 @@ package nu.marginalia.actor.task;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorControlFlowException;
import nu.marginalia.process.ProcessService;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.outbox.MqOutbox;
@ -29,11 +29,15 @@ public class ActorProcessWatcher {
* When interrupted, the process is killed and the message is marked as dead.
*/
public MqMessage waitResponse(MqOutbox outbox, ProcessService.ProcessId processId, long msgId)
throws AbstractActorPrototype.ControlFlowException, InterruptedException, SQLException
throws ActorControlFlowException, InterruptedException, SQLException
{
synchronized (processId) {
// Wake up the process spawning actor
processId.notifyAll();
}
if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) {
throw new AbstractActorPrototype.ControlFlowException("ERROR",
"Process " + processId + " did not launch");
throw new ActorControlFlowException("Process " + processId + " did not launch");
}
for (;;) {
@ -52,8 +56,7 @@ public class ActorProcessWatcher {
catch (TimeoutException ex) {
// Maybe the process died, wait a moment for it to restart
if (!waitForProcess(processId, TimeUnit.SECONDS, 30)) {
throw new AbstractActorPrototype.ControlFlowException("ERROR",
"Process " + processId + " died and did not re-launch");
throw new ActorControlFlowException("Process " + processId + " died and did not re-launch");
}
}
}

View File

@ -3,13 +3,10 @@ package nu.marginalia.actor.task;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
@ -21,41 +18,140 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.converting.ConvertAction;
import nu.marginalia.mqapi.converting.ConvertRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Files;
import java.nio.file.Path;
@Singleton
public class ConvertActor extends AbstractActorPrototype {
// STATES
public static final String INITIAL = "INITIAL";
public static final String CONVERT = "CONVERT";
public static final String CONVERT_ENCYCLOPEDIA = "CONVERT_ENCYCLOPEDIA";
public static final String CONVERT_DIRTREE = "CONVERT_DIRTREE";
public static final String CONVERT_STACKEXCHANGE = "CONVERT_STACKEXCHANGE";
public static final String CONVERT_WAIT = "CONVERT-WAIT";
public static final String END = "END";
public class ConvertActor extends RecordActorPrototype {
private final ActorProcessWatcher processWatcher;
private final MqOutbox mqConverterOutbox;
private final FileStorageService storageService;
private final Gson gson;
private final Logger logger = LoggerFactory.getLogger(getClass());
public record Convert(FileStorageId fid) implements ActorStep {};
public record ConvertEncyclopedia(String source) implements ActorStep {};
public record ConvertDirtree(String source) implements ActorStep {};
public record ConvertStackexchange(String source) implements ActorStep {};
@Resume(behavior = ActorResumeBehavior.RETRY)
public record ConvertWait(FileStorageId destFid,
long msgId) implements ActorStep {};
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Convert (FileStorageId fid) -> {
var toProcess = storageService.getStorage(fid);
var base = storageService.getStorageBase(FileStorageBaseType.STORAGE);
var processedArea = storageService.allocateTemporaryStorage(base,
FileStorageType.PROCESSED_DATA, "processed-data",
"Processed Data; " + toProcess.description());
storageService.relateFileStorages(toProcess.id(), processedArea.id());
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
// Pre-send convert request
var request = new ConvertRequest(ConvertAction.ConvertCrawlData,
null,
fid,
processedArea.id());
yield new ConvertWait(
processedArea.id(),
mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request))
);
}
case ConvertDirtree(String source) -> {
Path sourcePath = Path.of(source);
if (!Files.exists(sourcePath))
yield new Error("Source path does not exist: " + sourcePath);
String fileName = sourcePath.toFile().getName();
var base = storageService.getStorageBase(FileStorageBaseType.STORAGE);
var processedArea = storageService.allocateTemporaryStorage(base,
FileStorageType.PROCESSED_DATA, "processed-data",
"Processed Dirtree Data; " + fileName);
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
// Pre-send convert request
var request = new ConvertRequest(ConvertAction.SideloadDirtree,
sourcePath.toString(),
null,
processedArea.id());
yield new ConvertWait(
processedArea.id(),
mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request))
);
}
case ConvertEncyclopedia(String source) -> {
Path sourcePath = Path.of(source);
if (!Files.exists(sourcePath))
yield new Error("Source path does not exist: " + sourcePath);
String fileName = sourcePath.toFile().getName();
var base = storageService.getStorageBase(FileStorageBaseType.STORAGE);
var processedArea = storageService.allocateTemporaryStorage(base,
FileStorageType.PROCESSED_DATA, "processed-data",
"Processed Encylopedia Data; " + fileName);
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
// Pre-send convert request
var request = new ConvertRequest(ConvertAction.SideloadEncyclopedia,
sourcePath.toString(),
null,
processedArea.id());
@AllArgsConstructor @With @NoArgsConstructor
public static class Message {
public FileStorageId crawlStorageId = null;
public FileStorageId processedStorageId = null;
public long converterMsgId = 0L;
public long loaderMsgId = 0L;
};
yield new ConvertWait(
processedArea.id(),
mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request))
);
}
case ConvertStackexchange(String source) -> {
public record WaitInstructions(long msgId, FileStorageId dest) { }
Path sourcePath = Path.of(source);
if (!Files.exists(sourcePath))
yield new Error("Source path does not exist: " + sourcePath);
String fileName = sourcePath.toFile().getName();
var base = storageService.getStorageBase(FileStorageBaseType.STORAGE);
var processedArea = storageService.allocateTemporaryStorage(base,
FileStorageType.PROCESSED_DATA, "processed-data",
"Processed Stackexchange Data; " + fileName);
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
// Pre-send convert request
var request = new ConvertRequest(ConvertAction.SideloadStackexchange,
sourcePath.toString(),
null,
processedArea.id());
yield new ConvertWait(
processedArea.id(),
mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request))
);
}
case ConvertWait(FileStorageId destFid, long msgId) -> {
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, msgId);
if (rsp.state() != MqMessageState.OK) {
yield new Error("Converter failed");
}
storageService.setFileStorageState(destFid, FileStorageState.UNSET);
yield new End();
}
default -> new Error();
};
}
@Override
public String describe() {
@ -63,172 +159,15 @@ public class ConvertActor extends AbstractActorPrototype {
}
@Inject
public ConvertActor(ActorStateFactory stateFactory,
ActorProcessWatcher processWatcher,
public ConvertActor(ActorProcessWatcher processWatcher,
ProcessOutboxes processOutboxes,
FileStorageService storageService,
Gson gson
)
Gson gson)
{
super(stateFactory);
super(gson);
this.processWatcher = processWatcher;
this.mqConverterOutbox = processOutboxes.getConverterOutbox();
this.storageService = storageService;
this.gson = gson;
}
@ActorState(name= INITIAL, resume = ActorResumeBehavior.ERROR,
description = "Pro forma initial state")
public void initial(Integer unused) {
error("This actor does not support the initial state");
}
@ActorState(name = CONVERT,
next = CONVERT_WAIT,
resume = ActorResumeBehavior.ERROR,
description = """
Allocate a storage area for the processed data,
then send a convert request to the converter and transition to RECONVERT_WAIT.
"""
)
public Long convert(FileStorageId sourceStorageId) throws Exception {
// Create processed data area
var toProcess = storageService.getStorage(sourceStorageId);
var base = storageService.getStorageBase(FileStorageBaseType.STORAGE);
var processedArea = storageService.allocateTemporaryStorage(base,
FileStorageType.PROCESSED_DATA, "processed-data",
"Processed Data; " + toProcess.description());
storageService.relateFileStorages(toProcess.id(), processedArea.id());
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
// Pre-send convert request
var request = new ConvertRequest(ConvertAction.ConvertCrawlData,
null,
sourceStorageId,
processedArea.id());
return mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request));
}
@ActorState(name = CONVERT_ENCYCLOPEDIA,
next = CONVERT_WAIT,
resume = ActorResumeBehavior.ERROR,
description = """
Allocate a storage area for the processed data,
then send a convert request to the converter and transition to RECONVERT_WAIT.
"""
)
public Long convertEncyclopedia(String source) throws Exception {
// Create processed data area
Path sourcePath = Path.of(source);
if (!Files.exists(sourcePath))
error("Source path does not exist: " + sourcePath);
String fileName = sourcePath.toFile().getName();
var base = storageService.getStorageBase(FileStorageBaseType.STORAGE);
var processedArea = storageService.allocateTemporaryStorage(base,
FileStorageType.PROCESSED_DATA, "processed-data",
"Processed Encylopedia Data; " + fileName);
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
// Pre-send convert request
var request = new ConvertRequest(ConvertAction.SideloadEncyclopedia,
sourcePath.toString(),
null,
processedArea.id());
return mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request));
}
@ActorState(name = CONVERT_DIRTREE,
next = CONVERT_WAIT,
resume = ActorResumeBehavior.ERROR,
description = """
Allocate a storage area for the processed data,
then send a convert request to the converter and transition to RECONVERT_WAIT.
"""
)
public Long convertDirtree(String source) throws Exception {
// Create processed data area
Path sourcePath = Path.of(source);
if (!Files.exists(sourcePath))
error("Source path does not exist: " + sourcePath);
String fileName = sourcePath.toFile().getName();
var base = storageService.getStorageBase(FileStorageBaseType.STORAGE);
var processedArea = storageService.allocateTemporaryStorage(base,
FileStorageType.PROCESSED_DATA, "processed-data",
"Processed Dirtree Data; " + fileName);
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
// Pre-send convert request
var request = new ConvertRequest(ConvertAction.SideloadDirtree,
sourcePath.toString(),
null,
processedArea.id());
return mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request));
}
@ActorState(name = CONVERT_STACKEXCHANGE,
next = CONVERT_WAIT,
resume = ActorResumeBehavior.ERROR,
description = """
Allocate a storage area for the processed data,
then send a convert request to the converter and transition to RECONVERT_WAIT.
"""
)
public Long convertStackexchange(String source) throws Exception {
// Create processed data area
Path sourcePath = Path.of(source);
if (!Files.exists(sourcePath))
error("Source path does not exist: " + sourcePath);
String fileName = sourcePath.toFile().getName();
var base = storageService.getStorageBase(FileStorageBaseType.STORAGE);
var processedArea = storageService.allocateTemporaryStorage(base,
FileStorageType.PROCESSED_DATA, "processed-data",
"Processed Stackexchange Data; " + fileName);
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
// Pre-send convert request
var request = new ConvertRequest(ConvertAction.SideloadStackexchange,
sourcePath.toString(),
null,
processedArea.id());
return mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request));
}
@ActorState(
name = CONVERT_WAIT,
next = END,
resume = ActorResumeBehavior.RETRY,
description = """
Wait for the converter to finish processing the data.
"""
)
public void convertWait(WaitInstructions instructions) throws Exception {
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, instructions.msgId());
if (rsp.state() != MqMessageState.OK) {
error("Converter failed");
}
storageService.setFileStorageState(instructions.dest, FileStorageState.UNSET);
}
}

View File

@ -6,10 +6,10 @@ import com.google.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
@ -32,12 +32,11 @@ import nu.marginalia.mqapi.loading.LoadRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.SQLException;
import java.util.List;
@Singleton
public class ConvertAndLoadActor extends AbstractActorPrototype {
public class ConvertAndLoadActor extends RecordActorPrototype {
// STATES
@ -75,24 +74,154 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
public long loaderMsgId = 0L;
};
public record Initial(FileStorageId fid) implements ActorStep {};
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) implements ActorStep {
public Convert(FileStorageId crawlId, FileStorageId processedId) { this(crawlId, processedId, -1); }
}
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Load(List<FileStorageId> processedId, long msgId) implements ActorStep {
public Load(List<FileStorageId> processedId) { this(processedId, -1); }
};
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Backup(List<FileStorageId> processedIds) implements ActorStep { }
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Repartition(long id) implements ActorStep { public Repartition() { this(-1); } }
@Resume(behavior = ActorResumeBehavior.RETRY)
public record ReindexFwd(long id) implements ActorStep { public ReindexFwd() { this(-1); } }
@Resume(behavior = ActorResumeBehavior.RETRY)
public record ReindexFull(long id) implements ActorStep { public ReindexFull() { this(-1); } }
@Resume(behavior = ActorResumeBehavior.RETRY)
public record ReindexPrio(long id) implements ActorStep { public ReindexPrio() { this(-1); } }
public record SwitchOver() implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
logger.info("{}", self);
return switch (self) {
case Initial(FileStorageId fid) -> {
var storage = storageService.getStorage(fid);
if (storage == null) yield new Error("Bad storage id");
if (storage.type() != FileStorageType.CRAWL_DATA) yield new Error("Bad storage type " + storage.type());
var base = storageService.getStorageBase(FileStorageBaseType.STORAGE);
var processedArea = storageService.allocateTemporaryStorage(base, FileStorageType.PROCESSED_DATA, "processed-data",
"Processed Data; " + storage.description());
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
storageService.relateFileStorages(storage.id(), processedArea.id());
yield new Convert(fid, processedArea.id());
}
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) when msgId < 0 -> {
var request = new ConvertRequest(ConvertAction.ConvertCrawlData,
null,
crawlId,
processedId);
yield new Convert(crawlId, processedId,
mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request)));
}
case Convert(FileStorageId crawlId, FileStorageId processedId, long msgId) -> {
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, msgId);
if (rsp.state() != MqMessageState.OK)
yield new Error("Converter failed");
yield new Load(List.of(processedId));
}
case Load(List<FileStorageId> processedIds, long msgId) when msgId < 0 -> {
var request = new LoadRequest(processedIds);
long id = mqLoaderOutbox.sendAsync(LoadRequest.class.getSimpleName(), gson.toJson(request));
yield new Load(processedIds, id);
}
case Load(List<FileStorageId> processedIds, long msgId) -> {
var rsp = processWatcher.waitResponse(mqLoaderOutbox, ProcessService.ProcessId.LOADER, msgId);
if (rsp.state() != MqMessageState.OK) {
yield new Error("Loader failed");
} else {
cleanProcessedStorage(processedIds);
}
yield new Backup(processedIds);
}
case Backup(List<FileStorageId> processedIds) -> {
backupService.createBackupFromStaging(processedIds);
yield new Repartition();
}
case Repartition(long id) when id < 0 ->
new Repartition(indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, ""));
case Repartition(long id) -> {
var rsp = indexOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
yield new Error("Repartition failed");
}
yield new ReindexFwd();
}
case ReindexFwd(long id) when id < 0 -> new ReindexFwd(createIndex(IndexName.FORWARD));
case ReindexFwd(long id) -> {
var rsp = mqIndexConstructorOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK)
yield new Error("Repartition failed");
else
yield new ReindexFull();
}
case ReindexFull(long id) when id < 0 -> new ReindexFull(createIndex(IndexName.REVERSE_FULL));
case ReindexFull(long id) -> {
var rsp = mqIndexConstructorOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK)
yield new Error("Repartition failed");
else
yield new ReindexPrio();
}
case ReindexPrio(long id) when id < 0 -> new ReindexPrio(createIndex(IndexName.REVERSE_PRIO));
case ReindexPrio(long id) -> {
var rsp = mqIndexConstructorOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK)
yield new Error("Repartition failed");
else
yield new SwitchOver();
}
case SwitchOver() -> {
indexOutbox.sendNotice(IndexMqEndpoints.SWITCH_INDEX, ":^D");
indexOutbox.sendNotice(IndexMqEndpoints.SWITCH_LINKDB, ":-)");
yield new End();
}
default -> new Error();
};
}
private long createIndex(IndexName index) throws Exception {
return mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(),
gson.toJson(new CreateIndexRequest(index)));
}
@Override
public String describe() {
return "Process a set of crawl data and then load it into the database.";
}
@Inject
public ConvertAndLoadActor(ActorStateFactory stateFactory,
ActorProcessWatcher processWatcher,
public ConvertAndLoadActor(ActorProcessWatcher processWatcher,
ProcessOutboxes processOutboxes,
FileStorageService storageService,
IndexClient indexClient,
BackupService backupService,
Gson gson,
NodeConfigurationService nodeConfigurationService,
ServiceConfiguration serviceConfiguration
)
ServiceConfiguration serviceConfiguration)
{
super(stateFactory);
super(gson);
this.processWatcher = processWatcher;
this.indexOutbox = indexClient.outbox();
this.mqConverterOutbox = processOutboxes.getConverterOutbox();
@ -106,98 +235,6 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
this.nodeId = serviceConfiguration.node();
}
@ActorState(name = INITIAL,
next = RECONVERT,
description = """
Validate the input and transition to RECONVERT
""")
public Message init(FileStorageId crawlStorageId) throws Exception {
if (null == crawlStorageId) {
error("This Actor requires a FileStorageId to be passed in as a parameter to INITIAL");
}
var storage = storageService.getStorage(crawlStorageId);
if (storage == null) error("Bad storage id");
if (storage.type() != FileStorageType.CRAWL_DATA) error("Bad storage type " + storage.type());
return new Message().withCrawlStorageId(crawlStorageId);
}
@ActorState(name = RECONVERT,
next = RECONVERT_WAIT,
resume = ActorResumeBehavior.ERROR,
description = """
Allocate a storage area for the processed data,
then send a convert request to the converter and transition to RECONVERT_WAIT.
"""
)
public Message reconvert(Message message) throws Exception {
// Create processed data area
var toProcess = storageService.getStorage(message.crawlStorageId);
var base = storageService.getStorageBase(FileStorageBaseType.STORAGE);
var processedArea = storageService.allocateTemporaryStorage(base, FileStorageType.PROCESSED_DATA, "processed-data",
"Processed Data; " + toProcess.description());
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
storageService.relateFileStorages(toProcess.id(), processedArea.id());
// Pre-send convert request
var request = new ConvertRequest(ConvertAction.ConvertCrawlData,
null,
message.crawlStorageId,
processedArea.id());
long id = mqConverterOutbox.sendAsync(ConvertRequest.class.getSimpleName(), gson.toJson(request));
return message
.withProcessedStorageId(List.of(processedArea.id()))
.withConverterMsgId(id);
}
@ActorState(
name = RECONVERT_WAIT,
next = LOAD,
resume = ActorResumeBehavior.RETRY,
description = """
Wait for the converter to finish processing the data.
"""
)
public Message reconvertWait(Message message) throws Exception {
var rsp = processWatcher.waitResponse(mqConverterOutbox, ProcessService.ProcessId.CONVERTER, message.converterMsgId);
if (rsp.state() != MqMessageState.OK)
error("Converter failed");
return message;
}
@ActorState(
name = LOAD,
next = BACKUP,
resume = ActorResumeBehavior.RETRY,
description = """
Instruct the loader to process the data
""")
public Message load(Message message) throws Exception {
if (message.loaderMsgId <= 0) {
var request = new LoadRequest(message.processedStorageId);
long id = mqLoaderOutbox.sendAsync(LoadRequest.class.getSimpleName(), gson.toJson(request));
transition(LOAD, message.withLoaderMsgId(id));
}
var rsp = processWatcher.waitResponse(mqLoaderOutbox, ProcessService.ProcessId.LOADER, message.loaderMsgId);
if (rsp.state() != MqMessageState.OK) {
error("Loader failed");
} else {
cleanProcessedStorage(message.processedStorageId);
}
return message;
}
private void cleanProcessedStorage(List<FileStorageId> processedStorageId) {
try {
var config = nodeConfigurationService.get(nodeId);
@ -218,112 +255,4 @@ public class ConvertAndLoadActor extends AbstractActorPrototype {
}
}
@ActorState(
name = BACKUP,
next = REPARTITION,
resume = ActorResumeBehavior.RETRY,
description = """
Create a backup snapshot of the new data
""")
public void createBackup(Message message) throws SQLException, IOException {
backupService.createBackupFromStaging(message.processedStorageId);
}
@ActorState(
name = REPARTITION,
next = REINDEX_FWD,
resume = ActorResumeBehavior.RETRY,
description = """
Instruct the index-service to repartition.
"""
)
public void repartition(Long id) throws Exception {
if (id == null) {
transition(REPARTITION, indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, ""));
}
var rsp = indexOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("Repartition failed");
}
}
@ActorState(
name = REINDEX_FWD,
next = REINDEX_FULL,
resume = ActorResumeBehavior.RETRY,
description = """
Reconstruct the fwd index
"""
)
public void reindexFwd(Long id) throws Exception {
if (id == null) {
var request = new CreateIndexRequest(IndexName.FORWARD);
transition(REINDEX_FWD, mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request)));
}
var rsp = mqIndexConstructorOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("Repartition failed");
}
}
@ActorState(
name = REINDEX_FULL,
next = REINDEX_PRIO,
resume = ActorResumeBehavior.RETRY,
description = """
Reconstruct the reverse full index
"""
)
public void reindexFull(Long id) throws Exception {
if (id == null) {
var request = new CreateIndexRequest(IndexName.REVERSE_FULL);
transition(REINDEX_FULL, mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request)));
}
var rsp = mqIndexConstructorOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("Repartition failed");
}
}
@ActorState(
name = REINDEX_PRIO,
next = SWITCH_OVER,
resume = ActorResumeBehavior.RETRY,
description = """
Reconstruct the reverse prio index
"""
)
public void reindexPrio(Long id) throws Exception {
if (id == null) {
var request = new CreateIndexRequest(IndexName.REVERSE_PRIO);
transition(REINDEX_PRIO, mqIndexConstructorOutbox.sendAsync(CreateIndexRequest.class.getSimpleName(), gson.toJson(request)));
}
var rsp = mqIndexConstructorOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
error("Repartition failed");
}
}
@ActorState(
name = SWITCH_OVER,
next = END,
resume = ActorResumeBehavior.RETRY,
description = """
Move the new lexicon into place, instruct the index service to
switch to the new linkdb, and the new index.
"""
)
public void switchOver(Long id) throws Exception {
// Notify index to switch over
indexOutbox.sendNotice(IndexMqEndpoints.SWITCH_INDEX, ":^D");
indexOutbox.sendNotice(IndexMqEndpoints.SWITCH_LINKDB, ":-)");
}
}

View File

@ -3,13 +3,10 @@ package nu.marginalia.actor.task;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
@ -25,14 +22,8 @@ import org.slf4j.LoggerFactory;
import java.util.List;
@Singleton
public class CrawlActor extends AbstractActorPrototype {
public class CrawlActor extends RecordActorPrototype {
// STATES
public static final String INITIAL = "INITIAL";
public static final String CRAWL = "CRAWL";
public static final String CRAWL_WAIT = "CRAWL-WAIT";
public static final String END = "END";
private final MqOutbox mqCrawlerOutbox;
private final FileStorageService storageService;
private final Gson gson;
@ -40,13 +31,45 @@ public class CrawlActor extends AbstractActorPrototype {
private final ActorProcessWatcher processWatcher;
public record Initial(FileStorageId storageId) implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Crawl(long messageId) implements ActorStep {}
@AllArgsConstructor @With @NoArgsConstructor
public static class Message {
public FileStorageId crawlSpecId = null;
public FileStorageId crawlStorageId = null;
public long crawlerMsgId = 0L;
};
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial (FileStorageId fid) -> {
var storage = storageService.getStorage(fid);
if (storage == null) yield new Error("Bad storage id");
if (storage.type() != FileStorageType.CRAWL_SPEC) yield new Error("Bad storage type " + storage.type());
var base = storageService.getStorageBase(FileStorageBaseType.STORAGE);
var dataArea = storageService.allocateTemporaryStorage(
base,
FileStorageType.CRAWL_DATA,
"crawl-data",
storage.description());
storageService.relateFileStorages(storage.id(), dataArea.id());
// Send convert request
var request = new CrawlRequest(List.of(fid), dataArea.id());
long msgId = mqCrawlerOutbox.sendAsync(CrawlRequest.class.getSimpleName(), gson.toJson(request));
yield new Crawl(msgId);
}
case Crawl(long msgId) -> {
var rsp = processWatcher.waitResponse(mqCrawlerOutbox, ProcessService.ProcessId.CRAWLER, msgId);
if (rsp.state() != MqMessageState.OK)
yield new Error("Crawler failed");
yield new End();
}
default -> new Error();
};
}
@Override
public String describe() {
@ -54,84 +77,16 @@ public class CrawlActor extends AbstractActorPrototype {
}
@Inject
public CrawlActor(ActorStateFactory stateFactory,
ProcessOutboxes processOutboxes,
public CrawlActor(ProcessOutboxes processOutboxes,
FileStorageService storageService,
Gson gson,
ActorProcessWatcher processWatcher)
{
super(stateFactory);
super(gson);
this.mqCrawlerOutbox = processOutboxes.getCrawlerOutbox();
this.storageService = storageService;
this.gson = gson;
this.processWatcher = processWatcher;
}
@ActorState(name = INITIAL,
next = CRAWL,
description = """
Validate the input and transition to CRAWL
""")
public Message init(FileStorageId crawlStorageId) throws Exception {
if (null == crawlStorageId) {
error("This Actor requires a FileStorageId to be passed in as a parameter to INITIAL");
}
var storage = storageService.getStorage(crawlStorageId);
if (storage == null) error("Bad storage id");
if (storage.type() != FileStorageType.CRAWL_SPEC) error("Bad storage type " + storage.type());
return new Message().withCrawlSpecId(crawlStorageId);
}
@ActorState(name = CRAWL,
next = CRAWL_WAIT,
resume = ActorResumeBehavior.ERROR,
description = """
Allocate a storage area for the crawled data,
then send a crawl request to the crawler and transition to CRAWL_WAIT.
"""
)
public Message crawl(Message message) throws Exception {
// Create processed data area
var toCrawl = storageService.getStorage(message.crawlSpecId);
var base = storageService.getStorageBase(FileStorageBaseType.STORAGE);
var dataArea = storageService.allocateTemporaryStorage(
base,
FileStorageType.CRAWL_DATA,
"crawl-data",
toCrawl.description());
storageService.relateFileStorages(toCrawl.id(), dataArea.id());
// Pre-send convert request
var request = new CrawlRequest(List.of(message.crawlSpecId), dataArea.id());
long id = mqCrawlerOutbox.sendAsync(CrawlRequest.class.getSimpleName(), gson.toJson(request));
return message
.withCrawlStorageId(dataArea.id())
.withCrawlerMsgId(id);
}
@ActorState(
name = CRAWL_WAIT,
next = END,
resume = ActorResumeBehavior.RETRY,
description = """
Wait for the crawler to finish retreiving the data.
"""
)
public Message crawlerWait(Message message) throws Exception {
var rsp = processWatcher.waitResponse(mqCrawlerOutbox, ProcessService.ProcessId.CRAWLER, message.crawlerMsgId);
if (rsp.state() != MqMessageState.OK)
error("Crawler failed");
return message;
}
}

View File

@ -1,15 +1,11 @@
package nu.marginalia.actor.task;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.crawlspec.CrawlSpecFileNames;
import nu.marginalia.db.DbDomainStatsExportMultitool;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageBaseType;
import nu.marginalia.storage.model.FileStorageType;
@ -24,66 +20,58 @@ import java.nio.file.StandardOpenOption;
import static nu.marginalia.crawlspec.CrawlSpecGenerator.*;
@Singleton
public class CrawlJobExtractorActor extends AbstractActorPrototype {
public class CrawlJobExtractorActor extends RecordActorPrototype {
private final Logger logger = LoggerFactory.getLogger(getClass());
// STATES
public static final String CREATE_FROM_LINK = "CREATE_FROM_LINK";
public static final String END = "END";
private final FileStorageService fileStorageService;
@Inject
public CrawlJobExtractorActor(ActorStateFactory stateFactory,
public CrawlJobExtractorActor(Gson gson,
FileStorageService fileStorageService
) {
super(stateFactory);
super(gson);
this.fileStorageService = fileStorageService;
}
public record CrawlJobExtractorArguments(String description) { }
public record CrawlJobExtractorArgumentsWithURL(String description, String url) { }
public record CreateFromUrl(String description, String url) implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case CreateFromUrl(String description, String url) -> {
var base = fileStorageService.getStorageBase(FileStorageBaseType.STORAGE);
var storage = fileStorageService.allocateTemporaryStorage(base, FileStorageType.CRAWL_SPEC, "crawl-spec", description);
Path urlsTxt = storage.asPath().resolve("urls.txt");
try (var os = Files.newOutputStream(urlsTxt, StandardOpenOption.CREATE_NEW);
var is = new URL(url).openStream())
{
is.transferTo(os);
}
catch (Exception ex) {
fileStorageService.flagFileForDeletion(storage.id());
yield new Error("Error downloading " + url);
}
final Path path = CrawlSpecFileNames.resolve(storage);
generateCrawlSpec(
path,
DomainSource.fromFile(urlsTxt),
KnownUrlsCountSource.fixed(200),
KnownUrlsListSource.justIndex()
);
yield new End();
}
default -> new Error();
};
}
@Override
public String describe() {
return "Run the crawler job extractor process";
}
@ActorState(name = CREATE_FROM_LINK, next = END,
resume = ActorResumeBehavior.ERROR,
description = """
Download a list of URLs as provided,
and then spawn a CrawlJobExtractor process,
then wait for it to finish.
"""
)
public void createFromFromLink(CrawlJobExtractorArgumentsWithURL arg) throws Exception {
if (arg == null) {
error("This actor requires a CrawlJobExtractorArgumentsWithURL argument");
}
var base = fileStorageService.getStorageBase(FileStorageBaseType.STORAGE);
var storage = fileStorageService.allocateTemporaryStorage(base, FileStorageType.CRAWL_SPEC, "crawl-spec", arg.description());
Path urlsTxt = storage.asPath().resolve("urls.txt");
try (var os = Files.newOutputStream(urlsTxt, StandardOpenOption.CREATE_NEW);
var is = new URL(arg.url()).openStream())
{
is.transferTo(os);
}
catch (Exception ex) {
fileStorageService.flagFileForDeletion(storage.id());
error("Error downloading " + arg.url());
}
final Path path = CrawlSpecFileNames.resolve(storage);
generateCrawlSpec(
path,
DomainSource.fromFile(urlsTxt),
KnownUrlsCountSource.fixed(200),
KnownUrlsListSource.justIndex()
);
}
}

View File

@ -1,15 +1,14 @@
package nu.marginalia.actor.task;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageType;
@ -25,20 +24,12 @@ import java.nio.file.attribute.PosixFilePermissions;
import java.util.zip.GZIPOutputStream;
@Singleton
public class ExportDataActor extends AbstractActorPrototype {
public class ExportDataActor extends RecordActorPrototype {
private static final String blacklistFilename = "blacklist.csv.gz";
private static final String domainsFilename = "domains.csv.gz";
private static final String linkGraphFilename = "linkgraph.csv.gz";
// STATES
public static final String INITIAL = "INITIAL";
public static final String EXPORT_DOMAINS = "EXPORT-DOMAINS";
public static final String EXPORT_BLACKLIST = "EXPORT-BLACKLIST";
public static final String EXPORT_LINK_GRAPH = "EXPORT-LINK-GRAPH";
public static final String END = "END";
private final FileStorageService storageService;
private final HikariDataSource dataSource;
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -48,149 +39,128 @@ public class ExportDataActor extends AbstractActorPrototype {
public FileStorageId storageId = null;
};
public record Export() implements ActorStep {}
public record ExportBlacklist(FileStorageId fid) implements ActorStep {}
public record ExportDomains(FileStorageId fid) implements ActorStep {}
public record ExportLinkGraph(FileStorageId fid) implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch(self) {
case Export() -> {
var storage = storageService.getStorageByType(FileStorageType.EXPORT);
if (storage == null) yield new Error("Bad storage id");
yield new ExportBlacklist(storage.id());
}
case ExportBlacklist(FileStorageId fid) -> {
var storage = storageService.getStorage(fid);
var tmpFile = Files.createTempFile(storage.asPath(), "export", ".csv.gz",
PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--")));
try (var bw = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(Files.newOutputStream(tmpFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING))));
var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("SELECT URL_DOMAIN FROM EC_DOMAIN_BLACKLIST");
)
{
stmt.setFetchSize(1000);
var rs = stmt.executeQuery();
while (rs.next()) {
bw.write(rs.getString(1));
bw.write("\n");
}
Files.move(tmpFile, storage.asPath().resolve(blacklistFilename), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
}
catch (Exception ex) {
logger.error("Failed to export blacklist", ex);
yield new Error("Failed to export blacklist");
}
finally {
Files.deleteIfExists(tmpFile);
}
yield new ExportDomains(fid);
}
case ExportDomains(FileStorageId fid) -> {
var storage = storageService.getStorage(fid);
var tmpFile = Files.createTempFile(storage.asPath(), "export", ".csv.gz",
PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--")));
try (var bw = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(Files.newOutputStream(tmpFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING))));
var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("SELECT DOMAIN_NAME, ID, INDEXED, STATE FROM EC_DOMAIN");
)
{
stmt.setFetchSize(1000);
var rs = stmt.executeQuery();
while (rs.next()) {
bw.write(rs.getString("DOMAIN_NAME"));
bw.write(",");
bw.write(rs.getString("ID"));
bw.write(",");
bw.write(rs.getString("INDEXED"));
bw.write(",");
bw.write(rs.getString("STATE"));
bw.write("\n");
}
Files.move(tmpFile, storage.asPath().resolve(domainsFilename), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
}
catch (Exception ex) {
logger.error("Failed to export domains", ex);
yield new Error("Failed to export domains");
}
finally {
Files.deleteIfExists(tmpFile);
}
yield new ExportLinkGraph(fid);
}
case ExportLinkGraph(FileStorageId fid) -> {
var storage = storageService.getStorage(fid);
var tmpFile = Files.createTempFile(storage.asPath(), "export", ".csv.gz",
PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--")));
try (var bw = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(Files.newOutputStream(tmpFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING))));
var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("SELECT SOURCE_DOMAIN_ID, DEST_DOMAIN_ID FROM EC_DOMAIN_LINK");
)
{
stmt.setFetchSize(1000);
var rs = stmt.executeQuery();
while (rs.next()) {
bw.write(rs.getString("SOURCE_DOMAIN_ID"));
bw.write(",");
bw.write(rs.getString("DEST_DOMAIN_ID"));
bw.write("\n");
}
Files.move(tmpFile, storage.asPath().resolve(linkGraphFilename), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
}
catch (Exception ex) {
logger.error("Failed to export link graph", ex);
yield new Error("Failed to export link graph");
}
finally {
Files.deleteIfExists(tmpFile);
}
yield new End();
}
default -> new Error();
};
}
@Override
public String describe() {
return "Export data from the database to a storage area of type EXPORT.";
}
@Inject
public ExportDataActor(ActorStateFactory stateFactory,
public ExportDataActor(Gson gson,
FileStorageService storageService,
HikariDataSource dataSource)
{
super(stateFactory);
super(gson);
this.storageService = storageService;
this.dataSource = dataSource;
}
@ActorState(name = INITIAL,
next = EXPORT_BLACKLIST,
description = """
Find EXPORT storage area, then transition to EXPORT-BLACKLIST.
""")
public Message init(Integer i) throws Exception {
var storage = storageService.getStorageByType(FileStorageType.EXPORT);
if (storage == null) error("Bad storage id");
return new Message().withStorageId(storage.id());
}
@ActorState(name = EXPORT_BLACKLIST,
next = EXPORT_DOMAINS,
resume = ActorResumeBehavior.ERROR,
description = """
Export the blacklist from the database to the EXPORT storage area.
"""
)
public Message exportBlacklist(Message message) throws Exception {
var storage = storageService.getStorage(message.storageId);
var tmpFile = Files.createTempFile(storage.asPath(), "export", ".csv.gz",
PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--")));
try (var bw = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(Files.newOutputStream(tmpFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING))));
var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("SELECT URL_DOMAIN FROM EC_DOMAIN_BLACKLIST");
)
{
stmt.setFetchSize(1000);
var rs = stmt.executeQuery();
while (rs.next()) {
bw.write(rs.getString(1));
bw.write("\n");
}
Files.move(tmpFile, storage.asPath().resolve(blacklistFilename), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
}
catch (Exception ex) {
logger.error("Failed to export blacklist", ex);
error("Failed to export blacklist");
}
finally {
Files.deleteIfExists(tmpFile);
}
return message;
}
@ActorState(
name = EXPORT_DOMAINS,
next = EXPORT_LINK_GRAPH,
resume = ActorResumeBehavior.RETRY,
description = """
Export known domains to the EXPORT storage area.
"""
)
public Message exportDomains(Message message) throws Exception {
var storage = storageService.getStorage(message.storageId);
var tmpFile = Files.createTempFile(storage.asPath(), "export", ".csv.gz",
PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--")));
try (var bw = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(Files.newOutputStream(tmpFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING))));
var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("SELECT DOMAIN_NAME, ID, INDEXED, STATE FROM EC_DOMAIN");
)
{
stmt.setFetchSize(1000);
var rs = stmt.executeQuery();
while (rs.next()) {
bw.write(rs.getString("DOMAIN_NAME"));
bw.write(",");
bw.write(rs.getString("ID"));
bw.write(",");
bw.write(rs.getString("INDEXED"));
bw.write(",");
bw.write(rs.getString("STATE"));
bw.write("\n");
}
Files.move(tmpFile, storage.asPath().resolve(domainsFilename), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
}
catch (Exception ex) {
logger.error("Failed to export domains", ex);
error("Failed to export domains");
}
finally {
Files.deleteIfExists(tmpFile);
}
return message;
}
@ActorState(
name = EXPORT_LINK_GRAPH,
next = END,
resume = ActorResumeBehavior.RETRY,
description = """
Export known domains to the EXPORT storage area.
"""
)
public Message exportLinkGraph(Message message) throws Exception {
var storage = storageService.getStorage(message.storageId);
var tmpFile = Files.createTempFile(storage.asPath(), "export", ".csv.gz",
PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--")));
try (var bw = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(Files.newOutputStream(tmpFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING))));
var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("SELECT SOURCE_DOMAIN_ID, DEST_DOMAIN_ID FROM EC_DOMAIN_LINK");
)
{
stmt.setFetchSize(1000);
var rs = stmt.executeQuery();
while (rs.next()) {
bw.write(rs.getString("SOURCE_DOMAIN_ID"));
bw.write(",");
bw.write(rs.getString("DEST_DOMAIN_ID"));
bw.write("\n");
}
Files.move(tmpFile, storage.asPath().resolve(linkGraphFilename), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
}
catch (Exception ex) {
logger.error("Failed to export link graph", ex);
error("Failed to export link graph");
}
finally {
Files.deleteIfExists(tmpFile);
}
return message;
}
}

View File

@ -3,13 +3,10 @@ package nu.marginalia.actor.task;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.actor.state.Resume;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.storage.FileStorageService;
@ -19,19 +16,12 @@ import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mqapi.crawling.CrawlRequest;
import nu.marginalia.svc.DomainListRefreshService;
import org.jetbrains.annotations.NotNull;
import java.nio.file.Files;
@Singleton
public class RecrawlActor extends AbstractActorPrototype {
public class RecrawlActor extends RecordActorPrototype {
// STATES
public static final String INITIAL = "INITIAL";
public static final String CRAWL = "CRAWL";
public static final String CRAWL_WAIT = "CRAWL-WAIT";
public static final String END = "END";
private final MqOutbox mqCrawlerOutbox;
private final FileStorageService storageService;
private final DomainListRefreshService refreshService;
@ -39,36 +29,55 @@ public class RecrawlActor extends AbstractActorPrototype {
private final ActorProcessWatcher processWatcher;
@AllArgsConstructor @With @NoArgsConstructor
public static class RecrawlMessage {
/** The storage where the crawl data will be stored. If this contains existing crawl
* data, it will be consulted for e.g. e-tag comparisons.
*/
@NotNull
public FileStorageId crawlStorageId;
public record Initial(FileStorageId storageId) implements ActorStep {}
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Crawl(long messageId) implements ActorStep {}
public long crawlerMsgId = 0L;
};
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial (FileStorageId fid) -> {
var crawlStorage = storageService.getStorage(fid);
if (crawlStorage == null) yield new Error("Bad storage id");
if (crawlStorage.type() != FileStorageType.CRAWL_DATA) yield new Error("Bad storage type " + crawlStorage.type());
Files.deleteIfExists(crawlStorage.asPath().resolve("crawler.log"));
refreshService.synchronizeDomainList();
var request = new CrawlRequest(null, fid);
long id = mqCrawlerOutbox.sendAsync(CrawlRequest.class.getSimpleName(), gson.toJson(request));
yield new Crawl(id);
}
case Crawl (long msgId) -> {
var rsp = processWatcher.waitResponse(mqCrawlerOutbox, ProcessService.ProcessId.CRAWLER, msgId);
if (rsp.state() != MqMessageState.OK) {
yield new Error("Crawler failed");
}
else {
yield new End();
}
}
default -> new End();
};
}
@Override
public String describe() {
return "Run the crawler with the given crawl spec using previous crawl data for a reference";
}
public static RecrawlMessage recrawlFromCrawlDataAndCrawlSpec(FileStorageId crawlData) {
return new RecrawlMessage(crawlData, 0L);
}
@Inject
public RecrawlActor(ActorStateFactory stateFactory,
ActorProcessWatcher processWatcher,
public RecrawlActor(ActorProcessWatcher processWatcher,
ProcessOutboxes processOutboxes,
FileStorageService storageService,
DomainListRefreshService refreshService,
Gson gson
)
Gson gson)
{
super(stateFactory);
super(gson);
this.processWatcher = processWatcher;
this.mqCrawlerOutbox = processOutboxes.getCrawlerOutbox();
this.storageService = storageService;
@ -76,59 +85,4 @@ public class RecrawlActor extends AbstractActorPrototype {
this.gson = gson;
}
@ActorState(name = INITIAL,
next = CRAWL,
description = """
Validate the input and transition to CRAWL
""")
public RecrawlMessage init(RecrawlMessage recrawlMessage) throws Exception {
if (null == recrawlMessage) {
error("This Actor requires a message as an argument");
}
var crawlStorage = storageService.getStorage(recrawlMessage.crawlStorageId);
if (crawlStorage == null) error("Bad storage id");
if (crawlStorage.type() != FileStorageType.CRAWL_DATA) error("Bad storage type " + crawlStorage.type());
Files.deleteIfExists(crawlStorage.asPath().resolve("crawler.log"));
refreshService.synchronizeDomainList();
return recrawlMessage;
}
@ActorState(name = CRAWL,
next = CRAWL_WAIT,
resume = ActorResumeBehavior.ERROR,
description = """
Send a crawl request to the crawler and transition to CRAWL_WAIT.
"""
)
public RecrawlMessage crawl(RecrawlMessage recrawlMessage) throws Exception {
// Pre-send crawl request
var request = new CrawlRequest(null, recrawlMessage.crawlStorageId);
long id = mqCrawlerOutbox.sendAsync(CrawlRequest.class.getSimpleName(), gson.toJson(request));
return recrawlMessage.withCrawlerMsgId(id);
}
@ActorState(
name = CRAWL_WAIT,
next = END,
resume = ActorResumeBehavior.RETRY,
description = """
Wait for the crawler to finish retrieving the data.
"""
)
public RecrawlMessage crawlerWait(RecrawlMessage recrawlMessage) throws Exception {
var rsp = processWatcher.waitResponse(mqCrawlerOutbox, ProcessService.ProcessId.CRAWLER, recrawlMessage.crawlerMsgId);
if (rsp.state() != MqMessageState.OK)
error("Crawler failed");
return recrawlMessage;
}
}

View File

@ -1,53 +1,57 @@
package nu.marginalia.actor.task;
import com.google.gson.Gson;
import com.google.inject.Inject;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.svc.BackupService;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.mq.persistence.MqPersistence;
public class RestoreBackupActor extends AbstractActorPrototype {
// States
public static final String RESTORE = "RESTORE";
public static final String END = "END";
public class RestoreBackupActor extends RecordActorPrototype {
private final BackupService backupService;
private final int node;
private final MqPersistence mqPersistence;
public record Restore(FileStorageId fid) implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Restore(FileStorageId fid) -> {
backupService.restoreBackup(fid);
mqPersistence.sendNewMessage(
ExecutorActor.CONVERT_AND_LOAD.id() + ":" + node,
null,
null,
ConvertAndLoadActor.REPARTITION,
"",
null);
yield new End();
}
default -> new Error();
};
}
@Override
public String describe() {
return "Restores a backed up set of index data";
}
@Inject
public RestoreBackupActor(ActorStateFactory stateFactory,
public RestoreBackupActor(Gson gson,
MqPersistence mqPersistence,
BackupService backupService,
ServiceConfiguration configuration
) {
super(stateFactory);
super(gson);
this.mqPersistence = mqPersistence;
this.backupService = backupService;
this.node = configuration.node();
}
@ActorState(name=RESTORE, next = END, resume = ActorResumeBehavior.ERROR)
public void restoreBackup(FileStorageId id) throws Exception {
backupService.restoreBackup(id);
mqPersistence.sendNewMessage(
ExecutorActor.CONVERT_AND_LOAD.id() + ":" + node,
null,
null,
ConvertAndLoadActor.REPARTITION,
"",
null);
}
}

View File

@ -1,72 +0,0 @@
package nu.marginalia.actor.task;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Singleton
public class TransferDomainsActor extends AbstractActorPrototype {
// STATES
public static final String INITIAL = "INITIAL";
public static final String END = "END";
private final FileStorageService storageService;
private final ExecutorClient executorClient;
private final MqPersistence persistence;
private final HikariDataSource dataSource;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final int nodeId;
private final String executorServiceName;
@AllArgsConstructor @With @NoArgsConstructor
public static class Message {
int sourceNode;
int count;
};
@Override
public String describe() {
return "Transfers domains between nodes' crawl data sets";
}
@Inject
public TransferDomainsActor(ActorStateFactory stateFactory,
ServiceConfiguration configuration,
FileStorageService storageService,
ExecutorClient executorClient,
MqPersistence persistence,
HikariDataSource dataSource)
{
super(stateFactory);
this.storageService = storageService;
this.executorClient = executorClient;
this.persistence = persistence;
this.dataSource = dataSource;
this.nodeId = configuration.node();
this.executorServiceName = configuration.serviceName();
}
@ActorState(name = INITIAL,
next = END,
description = """
Transfer the domains
""")
public void init(Message message) throws Exception {
}
}

View File

@ -1,11 +1,10 @@
package nu.marginalia.actor.task;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.process.ProcessService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -15,20 +14,43 @@ import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
@Singleton
public class TriggerAdjacencyCalculationActor extends AbstractActorPrototype {
public class TriggerAdjacencyCalculationActor extends RecordActorPrototype {
private final Logger logger = LoggerFactory.getLogger(getClass());
// STATES
private static final String INITIAL = "INITIAL";
private static final String END = "END";
private final ProcessService processService;
private final ExecutorService executor = Executors.newSingleThreadExecutor();
public record Initial() implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch (self) {
case Initial() -> {
AtomicBoolean hasError = new AtomicBoolean(false);
var future = executor.submit(() -> {
try {
processService.trigger(ProcessService.ProcessId.ADJACENCIES_CALCULATOR, "load");
}
catch (Exception ex) {
logger.warn("Error triggering adjacency calculation", ex);
hasError.set(true);
}
});
future.get();
if (hasError.get()) {
yield new Error("Error triggering adjacency calculation");
}
yield new End();
}
default -> new Error();
};
}
@Inject
public TriggerAdjacencyCalculationActor(ActorStateFactory stateFactory,
public TriggerAdjacencyCalculationActor(Gson gson,
ProcessService processService) {
super(stateFactory);
super(gson);
this.processService = processService;
}
@ -37,28 +59,4 @@ public class TriggerAdjacencyCalculationActor extends AbstractActorPrototype {
return "Calculate website similarities";
}
@ActorState(name = INITIAL, next = END,
resume = ActorResumeBehavior.ERROR,
description = """
Spawns a WebsitesAdjacenciesCalculator process and waits for it to finish.
"""
)
public void init(Integer unused) throws Exception {
AtomicBoolean hasError = new AtomicBoolean(false);
var future = executor.submit(() -> {
try {
processService.trigger(ProcessService.ProcessId.ADJACENCIES_CALCULATOR, "load");
}
catch (Exception ex) {
logger.warn("Error triggering adjacency calculation", ex);
hasError.set(true);
}
});
future.get();
if (hasError.get()) {
error("Error triggering adjacency calculation");
}
}
}

View File

@ -1,81 +0,0 @@
package nu.marginalia.actor.task;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.storage.model.FileStorageId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
@Singleton
public class TruncateLinkDatabase extends AbstractActorPrototype {
// STATES
public static final String INITIAL = "INITIAL";
public static final String FLUSH_DATABASE = "FLUSH_DATABASE";
public static final String END = "END";
private final HikariDataSource dataSource;
private final Logger logger = LoggerFactory.getLogger(getClass());
@AllArgsConstructor @With @NoArgsConstructor
public static class Message {
public FileStorageId storageId = null;
};
@Override
public String describe() {
return "Remove all data from the link database.";
}
@Inject
public TruncateLinkDatabase(ActorStateFactory stateFactory,
HikariDataSource dataSource)
{
super(stateFactory);
this.dataSource = dataSource;
}
@ActorState(name = INITIAL,
next = FLUSH_DATABASE,
description = """
Initial stage
""")
public void init(Integer i) throws Exception {
}
@ActorState(name = FLUSH_DATABASE,
next = END,
resume = ActorResumeBehavior.ERROR,
description = """
Truncate the domain and link tables.
"""
)
public void flushDatabase() throws Exception {
try (var conn = dataSource.getConnection();
var stmt = conn.createStatement())
{
stmt.executeUpdate("SET FOREIGN_KEY_CHECKS = 0");
stmt.executeUpdate("TRUNCATE TABLE EC_DOMAIN_LINK");
stmt.executeUpdate("TRUNCATE TABLE DOMAIN_METADATA");
stmt.executeUpdate("SET FOREIGN_KEY_CHECKS = 1");
}
catch (SQLException ex) {
logger.error("Failed to truncate tables", ex);
error("Failed to truncate tables");
}
}
}

View File

@ -5,7 +5,6 @@ import com.google.inject.Inject;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ActorApi;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.actor.state.ActorStateInstance;
import nu.marginalia.executor.model.ActorRunState;
import nu.marginalia.executor.model.ActorRunStates;
@ -27,7 +26,6 @@ import spark.Spark;
import java.io.IOException;
import java.sql.SQLException;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
// Weird name for this one to not have clashes with java.util.concurrent.ExecutorService
@ -131,12 +129,7 @@ public class ExecutorSvc extends Service {
final String machineName = e.getKey().name();
final String stateName = state.name();
final String stateDescription = actorStateDescriptions.computeIfAbsent(
(machineName + "." + stateName),
k -> Optional.ofNullable(stateGraph.declaredStates().get(stateName))
.map(ActorState::description)
.orElse("Description missing for " + stateName)
);
final String stateDescription = "";
final boolean terminal = state.isFinal();
final boolean canStart = actorControlService.isDirectlyInitializable(e.getKey()) && terminal;

View File

@ -18,7 +18,7 @@ public class BackupService {
public Object restore(Request request, Response response) throws Exception {
var fid = FileStorageId.parse(request.params("fid"));
actorControlService.startFrom(ExecutorActor.RESTORE_BACKUP, RestoreBackupActor.RESTORE, fid);
actorControlService.startFrom(ExecutorActor.RESTORE_BACKUP, new RestoreBackupActor.Restore(fid));
return "";
}
}

View File

@ -4,10 +4,7 @@ import com.google.gson.Gson;
import com.google.inject.Inject;
import nu.marginalia.actor.ExecutorActor;
import nu.marginalia.actor.ExecutorActorControlService;
import nu.marginalia.actor.task.ConvertActor;
import nu.marginalia.actor.task.ConvertAndLoadActor;
import nu.marginalia.actor.task.CrawlJobExtractorActor;
import nu.marginalia.actor.task.RecrawlActor;
import nu.marginalia.actor.task.*;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.executor.model.load.LoadParameters;
import spark.Request;
@ -27,33 +24,34 @@ public class ProcessingService {
public Object startRecrawl(Request request, Response response) throws Exception {
var crawlId = gson.fromJson(request.body(), FileStorageId.class);
actorControlService.start(
actorControlService.startFrom(
ExecutorActor.RECRAWL,
RecrawlActor.recrawlFromCrawlDataAndCrawlSpec(crawlId)
new RecrawlActor.Initial(crawlId)
);
return "";
}
public Object startCrawl(Request request, Response response) throws Exception {
actorControlService.start(ExecutorActor.CRAWL, FileStorageId.parse(request.params("fid")));
actorControlService.startFrom(ExecutorActor.CRAWL,
new CrawlActor.Initial(FileStorageId.parse(request.params("fid"))));
return "";
}
public Object startConversion(Request request, Response response) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT,
ConvertActor.CONVERT,
FileStorageId.parse(request.params("fid")));
new ConvertActor.Convert(FileStorageId.parse(request.params("fid"))));
return "";
}
public Object startConvertLoad(Request request, Response response) throws Exception {
actorControlService.start(
actorControlService.startFrom(
ExecutorActor.CONVERT_AND_LOAD,
FileStorageId.parse(request.params("fid"))
new ConvertAndLoadActor.Initial(FileStorageId.parse(request.params("fid")))
);
return "";
}
@ -64,10 +62,7 @@ public class ProcessingService {
// Start the FSM from the intermediate state that triggers the load
actorControlService.startFrom(
ExecutorActor.CONVERT_AND_LOAD,
ConvertAndLoadActor.LOAD,
new ConvertAndLoadActor.Message(null, params.ids(),
0L,
0L)
new ConvertAndLoadActor.Load(params.ids())
);
return "";
@ -79,8 +74,8 @@ public class ProcessingService {
}
public Object createCrawlSpecFromDownload(Request request, Response response) throws Exception {
actorControlService.startFrom(ExecutorActor.CRAWL_JOB_EXTRACTOR, CrawlJobExtractorActor.CREATE_FROM_LINK,
new CrawlJobExtractorActor.CrawlJobExtractorArgumentsWithURL(
actorControlService.startFrom(ExecutorActor.CRAWL_JOB_EXTRACTOR,
new CrawlJobExtractorActor.CreateFromUrl(
request.queryParamOrDefault("description", ""),
request.queryParamOrDefault("url", ""))
);

View File

@ -16,17 +16,17 @@ public class SideloadService {
}
public Object sideloadDirtree(Request request, Response response) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT, ConvertActor.CONVERT_DIRTREE, request.queryParams("path"));
actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertDirtree(request.queryParams("path")));
return "";
}
public Object sideloadEncyclopedia(Request request, Response response) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT, ConvertActor.CONVERT_ENCYCLOPEDIA, request.queryParams("path"));
actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertEncyclopedia(request.queryParams("path")));
return "";
}
public Object sideloadStackexchange(Request request, Response response) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT, ConvertActor.CONVERT_STACKEXCHANGE, request.queryParams("path"));
actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertStackexchange(request.queryParams("path")));
return "";
}
}

View File

@ -68,7 +68,7 @@ public class ExecutorSvcApiIntegrationTest {
@Test
public void startStartActor() throws Exception {
testInstances.client.startFsm(Context.internal(), 0, "crawl");
Mockito.verify(testInstances.actorControlService).startJSON(ExecutorActor.CRAWL, "\"\"");
Mockito.verify(testInstances.actorControlService).start(ExecutorActor.CRAWL);
}
@Test
@ -82,7 +82,7 @@ public class ExecutorSvcApiIntegrationTest {
public void triggerCrawl() throws Exception {
testInstances.client.triggerCrawl(Context.internal(), 0, "1");
Mockito.verify(testInstances.actorControlService).start(eq(ExecutorActor.CRAWL), any());
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CRAWL), any());
}
@Test
@ -90,7 +90,7 @@ public class ExecutorSvcApiIntegrationTest {
testInstances.client.triggerRecrawl(Context.internal(), 0,
new FileStorageId(0));
Mockito.verify(testInstances.actorControlService).start(eq(ExecutorActor.RECRAWL), any());
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.RECRAWL), any());
}
@ -98,7 +98,7 @@ public class ExecutorSvcApiIntegrationTest {
public void triggerProcessAndLoad() throws Exception {
testInstances.client.triggerConvertAndLoad(Context.internal(), 0, FileStorageId.of(1));
Mockito.verify(testInstances.actorControlService).start(eq(ExecutorActor.CONVERT_AND_LOAD), any());
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT_AND_LOAD), any());
}
@Test
@ -108,37 +108,6 @@ public class ExecutorSvcApiIntegrationTest {
Mockito.verify(testInstances.actorControlService).start(eq(ExecutorActor.ADJACENCY_CALCULATION));
}
@Test
public void sideloadDirtree() throws Exception {
testInstances.client.sideloadDirtree(Context.internal(), 0, Path.of("/tmp/test"));
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT), eq("CONVERT_DIRTREE"), eq("/tmp/test"));
}
@Test
public void sideloadEncyclopedia() throws Exception {
testInstances.client.sideloadEncyclopedia(Context.internal(), 0, Path.of("/tmp/test"));
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT), eq("CONVERT_ENCYCLOPEDIA"), eq("/tmp/test"));
}
@Test
public void sideloadStackexchange() throws Exception {
testInstances.client.sideloadStackexchange(Context.internal(), 0, Path.of("/tmp/test"));
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT), eq("CONVERT_STACKEXCHANGE"), eq("/tmp/test"));
}
@Test
public void testCreateCrawlSpecFromUrl() throws Exception {
testInstances.client.createCrawlSpecFromDownload(Context.internal(), 0, "Lorem Ipsum", "http://www.example.com");
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CRAWL_JOB_EXTRACTOR), eq("CREATE_FROM_LINK"), eq(new CrawlJobExtractorActor.CrawlJobExtractorArgumentsWithURL("Lorem Ipsum", "http://www.example.com")));
}
@Test
public void backupRestore() throws Exception {
testInstances.client.restoreBackup(Context.internal(), 0, "1");
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.RESTORE_BACKUP), eq("RESTORE"), eq(new FileStorageId(1)));
}
}
class TestInstances {