diff --git a/code/api/executor-api/build.gradle b/code/api/executor-api/build.gradle index 28defeb2..257c0285 100644 --- a/code/api/executor-api/build.gradle +++ b/code/api/executor-api/build.gradle @@ -1,6 +1,7 @@ plugins { id 'java' id 'jvm-test-suite' + id "com.google.protobuf" version "0.9.4" } java { @@ -9,6 +10,18 @@ java { } } +apply from: "$rootProject.projectDir/protobuf.gradle" + +sourceSets { + main { + proto { + srcDir 'src/main/protobuf' + } + } +} + + + dependencies { implementation project(':code:common:model') implementation project(':code:api:index-api') @@ -25,6 +38,8 @@ dependencies { implementation libs.guice implementation libs.rxjava implementation libs.protobuf + implementation libs.bundles.grpc + implementation libs.javax.annotation implementation libs.gson testImplementation libs.bundles.slf4j.test diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java index df9c4072..c9b286d5 100644 --- a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java +++ b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java @@ -1,154 +1,282 @@ package nu.marginalia.executor.client; import com.google.inject.Inject; -import io.reactivex.rxjava3.core.Observable; +import com.google.inject.Singleton; import nu.marginalia.client.AbstractDynamicClient; import nu.marginalia.client.Context; -import nu.marginalia.client.exception.RouteNotConfiguredException; +import nu.marginalia.client.grpc.GrpcStubPool; +import nu.marginalia.executor.api.*; +import nu.marginalia.executor.api.ExecutorApiGrpc.ExecutorApiBlockingStub; +import nu.marginalia.executor.model.ActorRunState; import nu.marginalia.executor.model.ActorRunStates; -import nu.marginalia.executor.model.load.LoadParameters; import nu.marginalia.executor.model.transfer.TransferItem; import nu.marginalia.executor.model.transfer.TransferSpec; import nu.marginalia.executor.storage.FileStorageContent; +import nu.marginalia.executor.storage.FileStorageFile; import nu.marginalia.executor.upload.UploadDirContents; +import nu.marginalia.executor.upload.UploadDirItem; import nu.marginalia.model.gson.GsonFactory; +import nu.marginalia.nodecfg.NodeConfigurationService; +import nu.marginalia.nodecfg.model.NodeConfiguration; import nu.marginalia.service.descriptor.ServiceDescriptors; import nu.marginalia.service.id.ServiceId; -import nu.marginalia.storage.model.FileStorage; import nu.marginalia.storage.model.FileStorageId; +import io.grpc.ManagedChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.OutputStream; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; +@Singleton public class ExecutorClient extends AbstractDynamicClient { + private final GrpcStubPool stubPool; + private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class); + @Inject - public ExecutorClient(ServiceDescriptors descriptors) { + public ExecutorClient(ServiceDescriptors descriptors, NodeConfigurationService nodeConfigurationService) { super(descriptors.forId(ServiceId.Executor), GsonFactory::get); + + stubPool = new GrpcStubPool<>(ServiceId.Executor) { + @Override + public ExecutorApiBlockingStub createStub(ManagedChannel channel) { + return ExecutorApiGrpc.newBlockingStub(channel); + } + + @Override + public List getEligibleNodes() { + return nodeConfigurationService.getAll() + .stream() + .map(NodeConfiguration::node) + .toList(); + } + }; } - public void startFsm(Context ctx, int node, String actorName) { - post(ctx, node, "/actor/"+actorName+"/start", "").blockingSubscribe(); + public void startFsm(int node, String actorName) { + stubPool.apiForNode(node).startFsm( + RpcFsmName.newBuilder() + .setActorName(actorName) + .build() + ); } - public void stopFsm(Context ctx, int node, String actorName) { - post(ctx, node, "/actor/"+actorName+"/stop", "").blockingSubscribe(); + public void stopFsm(int node, String actorName) { + stubPool.apiForNode(node).stopFsm( + RpcFsmName.newBuilder() + .setActorName(actorName) + .build() + ); } - public void stopProcess(Context ctx, int node, String id) { - post(ctx, node, "/process/" + id + "/stop", "").blockingSubscribe(); + public void stopProcess(int node, String id) { + stubPool.apiForNode(node).stopProcess( + RpcProcessId.newBuilder() + .setProcessId(id) + .build() + ); } - - public void triggerCrawl(Context ctx, int node, FileStorageId fid) { - post(ctx, node, "/process/crawl/" + fid, "").blockingSubscribe(); + public void triggerCrawl(int node, FileStorageId fid) { + stubPool.apiForNode(node).triggerCrawl( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public void triggerRecrawl(Context ctx, int node, FileStorageId fid) { - post(ctx, node, "/process/recrawl", fid).blockingSubscribe(); + public void triggerRecrawl(int node, FileStorageId fid) { + stubPool.apiForNode(node).triggerRecrawl( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public void triggerConvert(Context ctx, int node, FileStorageId fid) { - post(ctx, node, "/process/convert/" + fid.id(), "").blockingSubscribe(); + public void triggerConvert(int node, FileStorageId fid) { + stubPool.apiForNode(node).triggerConvert( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public void triggerConvertAndLoad(Context ctx, int node, FileStorageId fid) { - post(ctx, node, "/process/convert-load/" + fid.id(), "").blockingSubscribe(); + public void triggerConvertAndLoad(int node, FileStorageId fid) { + stubPool.apiForNode(node).triggerConvertAndLoad( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public void loadProcessedData(Context ctx, int node, LoadParameters ids) { - post(ctx, node, "/process/load", ids).blockingSubscribe(); + public void loadProcessedData(int node, List ids) { + stubPool.apiForNode(node).loadProcessedData( + RpcFileStorageIds.newBuilder() + .addAllFileStorageIds(ids.stream().map(FileStorageId::id).toList()) + .build() + ); } - public void calculateAdjacencies(Context ctx, int node) { - post(ctx, node, "/process/adjacency-calculation", "").blockingSubscribe(); + public void calculateAdjacencies(int node) { + stubPool.apiForNode(node).calculateAdjacencies(Empty.getDefaultInstance()); } - public void sideloadEncyclopedia(Context ctx, int node, Path sourcePath, String baseUrl) { - post(ctx, node, - "/sideload/encyclopedia?path="+ URLEncoder.encode(sourcePath.toString(), StandardCharsets.UTF_8) + "&baseUrl=" + URLEncoder.encode(baseUrl, StandardCharsets.UTF_8), - "").blockingSubscribe(); + public void sideloadEncyclopedia(int node, Path sourcePath, String baseUrl) { + stubPool.apiForNode(node).sideloadEncyclopedia( + RpcSideloadEncyclopedia.newBuilder() + .setBaseUrl(baseUrl) + .setSourcePath(sourcePath.toString()) + .build() + ); } - public void sideloadDirtree(Context ctx, int node, Path sourcePath) { - post(ctx, node, - "/sideload/dirtree?path="+ URLEncoder.encode(sourcePath.toString(), StandardCharsets.UTF_8), - "").blockingSubscribe(); + public void sideloadDirtree(int node, Path sourcePath) { + stubPool.apiForNode(node).sideloadDirtree( + RpcSideloadDirtree.newBuilder() + .setSourcePath(sourcePath.toString()) + .build() + ); } - public void sideloadWarc(Context ctx, int node, Path sourcePath) { - post(ctx, node, - "/sideload/warc?path="+ URLEncoder.encode(sourcePath.toString(), StandardCharsets.UTF_8), - "").blockingSubscribe(); + public void sideloadWarc(int node, Path sourcePath) { + stubPool.apiForNode(node).sideloadWarc( + RpcSideloadWarc.newBuilder() + .setSourcePath(sourcePath.toString()) + .build() + ); } - public void sideloadStackexchange(Context ctx, int node, Path sourcePath) { - post(ctx, node, - "/sideload/stackexchange?path="+URLEncoder.encode(sourcePath.toString(), StandardCharsets.UTF_8), - "").blockingSubscribe(); + public void sideloadStackexchange(int node, Path sourcePath) { + stubPool.apiForNode(node).sideloadStackexchange( + RpcSideloadStackexchange.newBuilder() + .setSourcePath(sourcePath.toString()) + .build() + ); } - public void createCrawlSpecFromDownload(Context context, int node, String description, String url) { - post(context, node, "/process/crawl-spec/from-download?description="+URLEncoder.encode(description, StandardCharsets.UTF_8)+"&url="+URLEncoder.encode(url, StandardCharsets.UTF_8), "") - .blockingSubscribe(); + public void createCrawlSpecFromDownload(int node, String description, String url) { + stubPool.apiForNode(node).createCrawlSpecFromDownload( + RpcCrawlSpecFromDownload.newBuilder() + .setDescription(description) + .setUrl(url) + .build() + ); } - public void exportAtags(Context ctx, int node, FileStorageId fid) { - post(ctx, node, "/export/atags?fid="+fid, "").blockingSubscribe(); + public void exportAtags(int node, FileStorageId fid) { + stubPool.apiForNode(node).exportAtags( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public void exportSampleData(Context ctx, int node, FileStorageId fid, int size, String name) { - post(ctx, node, "/export/sample-data?fid="+fid+"&size="+size+"&name="+URLEncoder.encode(name, StandardCharsets.UTF_8), "").blockingSubscribe(); + public void exportSampleData(int node, FileStorageId fid, int size, String name) { + stubPool.apiForNode(node).exportSampleData( + RpcExportSampleData.newBuilder() + .setFileStorageId(fid.id()) + .setSize(size) + .setName(name) + .build() + ); } - public void exportRssFeeds(Context ctx, int node, FileStorageId fid) { - post(ctx, node, "/export/feeds?fid="+fid, "").blockingSubscribe(); + public void exportRssFeeds(int node, FileStorageId fid) { + stubPool.apiForNode(node).exportRssFeeds( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public void exportTermFrequencies(Context ctx, int node, FileStorageId fid) { - post(ctx, node, "/export/termfreq?fid="+fid, "").blockingSubscribe(); + public void exportTermFrequencies(int node, FileStorageId fid) { + stubPool.apiForNode(node).exportTermFrequencies( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public void downloadSampleData(Context ctx, int node, String sampleSet) { - post(ctx, node, "/action/download-sample-data?set="+URLEncoder.encode(sampleSet, StandardCharsets.UTF_8), "").blockingSubscribe(); + public void downloadSampleData(int node, String sampleSet) { + stubPool.apiForNode(node).downloadSampleData( + RpcDownloadSampleData.newBuilder() + .setSampleSet(sampleSet) + .build() + ); } - public void exportData(Context ctx, int node) { - post(ctx, node, "/export/data", "").blockingSubscribe(); + public void exportData(int node) { + stubPool.apiForNode(node).exportData(Empty.getDefaultInstance()); } - public void restoreBackup(Context context, int node, FileStorageId fid) { - post(context, node, "/backup/" + fid + "/restore", "").blockingSubscribe(); + public void restoreBackup(int node, FileStorageId fid) { + stubPool.apiForNode(node).restoreBackup( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public ActorRunStates getActorStates(Context context, int node) { + public ActorRunStates getActorStates(int node) { try { - return get(context, node, "/actor", ActorRunStates.class).blockingFirst(); + var rs = stubPool.apiForNode(node).getActorStates(Empty.getDefaultInstance()); + var states = rs.getActorRunStatesList().stream() + .map(r -> new ActorRunState( + r.getActorName(), + r.getState(), + r.getActorDescription(), + r.getStateDescription(), + r.getTerminal(), + r.getCanStart()) + ) + .toList(); + + return new ActorRunStates(node, states); } - catch (RouteNotConfiguredException ex) { - // node is down, return dummy data - return new ActorRunStates(node, new ArrayList<>()); + catch (Exception ex) { + logger.warn("Failed to get actor states", ex); + + // Return an empty list of states to avoid breaking the UI when a node is down + return new ActorRunStates(node, List.of()); } } - public UploadDirContents listSideloadDir(Context context, int node) { + public UploadDirContents listSideloadDir(int node) { try { - return get(context, node, "/sideload/", UploadDirContents.class).blockingFirst(); + var rs = stubPool.apiForNode(node).listSideloadDir(Empty.getDefaultInstance()); + var items = rs.getEntriesList().stream() + .map(i -> new UploadDirItem(i.getName(), i.getLastModifiedTime(), i.getIsDirectory(), i.getSize())) + .toList(); + return new UploadDirContents(rs.getPath(), items); } - catch (RouteNotConfiguredException ex) { - // node is down, return dummy data - return new UploadDirContents("/", new ArrayList<>()); + catch (Exception ex) { + logger.warn("Failed to list sideload dir", ex); + + // Return an empty list of items to avoid breaking the UI when a node is down + return new UploadDirContents("", List.of()); } } - public FileStorageContent listFileStorage(Context context, int node, FileStorageId fileId) { + public FileStorageContent listFileStorage(int node, FileStorageId fileId) { try { - return get(context, node, "/storage/"+fileId.id(), FileStorageContent.class).blockingFirst(); + var rs = stubPool.apiForNode(node).listFileStorage( + RpcFileStorageId.newBuilder() + .setFileStorageId(fileId.id()) + .build() + ); + + return new FileStorageContent(rs.getEntriesList().stream() + .map(e -> new FileStorageFile(e.getName(), e.getSize(), e.getLastModifiedTime())) + .toList()); } - catch (RouteNotConfiguredException ex) { - // node is down, return dummy data - return new FileStorageContent(new ArrayList<>()); + catch (Exception ex) { + logger.warn("Failed to list file storage", ex); + + // Return an empty list of items to avoid breaking the UI when a node is down + return new FileStorageContent(List.of()); } } diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java index ffbe168c..3bdbe95c 100644 --- a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java +++ b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java @@ -1,7 +1,7 @@ package nu.marginalia.executor.client; import com.google.inject.Inject; -import jakarta.inject.Singleton; +import com.google.inject.Singleton; import nu.marginalia.model.gson.GsonFactory; import com.google.gson.Gson; import nu.marginalia.mq.MqMessage; diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/model/load/LoadParameters.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/model/load/LoadParameters.java deleted file mode 100644 index 1874406b..00000000 --- a/code/api/executor-api/src/main/java/nu/marginalia/executor/model/load/LoadParameters.java +++ /dev/null @@ -1,10 +0,0 @@ -package nu.marginalia.executor.model.load; - -import nu.marginalia.storage.model.FileStorageId; - -import java.util.List; - -public record LoadParameters( - List ids -) { -} diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/upload/UploadDirItem.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/upload/UploadDirItem.java index bb6a6f24..c1953f8f 100644 --- a/code/api/executor-api/src/main/java/nu/marginalia/executor/upload/UploadDirItem.java +++ b/code/api/executor-api/src/main/java/nu/marginalia/executor/upload/UploadDirItem.java @@ -1,29 +1,9 @@ package nu.marginalia.executor.upload; -import lombok.SneakyThrows; - -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; - public record UploadDirItem ( String name, String lastModifiedTime, boolean isDirectory, long size ) { - - @SneakyThrows - public static UploadDirItem fromPath(Path path) { - boolean isDir = Files.isDirectory(path); - long size = isDir ? 0 : Files.size(path); - var mtime = Files.getLastModifiedTime(path); - - - return new UploadDirItem(path.toString(), - LocalDateTime.ofInstant(mtime.toInstant(), ZoneId.systemDefault()).format(DateTimeFormatter.ISO_DATE_TIME), isDir, size); - } - } diff --git a/code/api/executor-api/src/main/protobuf/executor-api.proto b/code/api/executor-api/src/main/protobuf/executor-api.proto new file mode 100644 index 00000000..bc05844e --- /dev/null +++ b/code/api/executor-api/src/main/protobuf/executor-api.proto @@ -0,0 +1,105 @@ +syntax="proto3"; +package actorapi; + +option java_package="nu.marginalia.executor.api"; +option java_multiple_files=true; + +service ExecutorApi { + rpc startFsm(RpcFsmName) returns (Empty) {} + rpc stopFsm(RpcFsmName) returns (Empty) {} + + rpc stopProcess(RpcProcessId) returns (Empty) {} + + rpc triggerCrawl(RpcFileStorageId) returns (Empty) {} + rpc triggerRecrawl(RpcFileStorageId) returns (Empty) {} + rpc triggerConvert(RpcFileStorageId) returns (Empty) {} + rpc triggerConvertAndLoad(RpcFileStorageId) returns (Empty) {} + rpc loadProcessedData(RpcFileStorageIds) returns (Empty) {} + rpc calculateAdjacencies(Empty) returns (Empty) {} + + rpc sideloadEncyclopedia(RpcSideloadEncyclopedia) returns (Empty) {} + rpc sideloadDirtree(RpcSideloadDirtree) returns (Empty) {} + rpc sideloadWarc(RpcSideloadWarc) returns (Empty) {} + rpc sideloadStackexchange(RpcSideloadStackexchange) returns (Empty) {} + + rpc createCrawlSpecFromDownload(RpcCrawlSpecFromDownload) returns (Empty) {} + rpc exportAtags(RpcFileStorageId) returns (Empty) {} + rpc exportSampleData(RpcExportSampleData) returns (Empty) {} + rpc exportRssFeeds(RpcFileStorageId) returns (Empty) {} + rpc exportTermFrequencies(RpcFileStorageId) returns (Empty) {} + rpc downloadSampleData(RpcDownloadSampleData) returns (Empty) {} + rpc exportData(Empty) returns (Empty) {} + rpc restoreBackup(RpcFileStorageId) returns (Empty) {} + rpc getActorStates(Empty) returns (RpcActorRunStates) {} + rpc listSideloadDir(Empty) returns (RpcUploadDirContents) {} + rpc listFileStorage(RpcFileStorageId) returns (RpcFileStorageContent) {} +} + +message Empty {} +message RpcFsmName { + string actorName = 1; +} +message RpcProcessId { + string processId = 1; +} +message RpcFileStorageId { + int64 fileStorageId = 1; +} +message RpcFileStorageIds { + repeated int64 fileStorageIds = 1; +} +message RpcSideloadEncyclopedia { + string sourcePath = 1; + string baseUrl = 2; +} +message RpcSideloadDirtree { + string sourcePath = 1; +} +message RpcSideloadWarc { + string sourcePath = 1; +} +message RpcSideloadStackexchange { + string sourcePath = 1; +} +message RpcCrawlSpecFromDownload { + string description = 1; + string url = 2; +} +message RpcExportSampleData { + int64 fileStorageId = 1; + int32 size = 2; + string name = 3; +} +message RpcDownloadSampleData { + string sampleSet = 1; +} +message RpcActorRunStates { + int32 node = 1; + repeated RpcActorRunState actorRunStates = 2; +} +message RpcActorRunState { + string actorName = 1; + string state = 2; + string actorDescription = 3; + string stateDescription = 4; + bool terminal = 5; + bool canStart = 6; +} +message RpcUploadDirContents { + string path = 1; + repeated RpcUploadDirEntry entries = 2; +} +message RpcUploadDirEntry { + string name = 1; + string lastModifiedTime = 2; + bool isDirectory = 3; + int64 size = 4; +} +message RpcFileStorageContent { + repeated RpcFileStorageEntry entries = 1; +} +message RpcFileStorageEntry { + string name = 1; + int64 size = 2; + string lastModifiedTime = 3; +} diff --git a/code/api/index-api/build.gradle b/code/api/index-api/build.gradle index 51205960..08e629a4 100644 --- a/code/api/index-api/build.gradle +++ b/code/api/index-api/build.gradle @@ -17,6 +17,9 @@ sourceSets { } } } + +apply from: "$rootProject.projectDir/protobuf.gradle" + dependencies { implementation project(':code:common:model') implementation project(':code:common:config') @@ -42,30 +45,5 @@ dependencies { testImplementation libs.mockito } -protobuf { - protoc { - if (osdetector.os == "osx") { - artifact = "com.google.protobuf:protoc:3.0.2:osx-x86_64" - } else { - artifact = "com.google.protobuf:protoc:3.0.2" - } - } - plugins { - grpc { - if (osdetector.os == "osx") { - artifact = "io.grpc:protoc-gen-grpc-java:1.1.2:osx-x86_64" - } else { - artifact = "io.grpc:protoc-gen-grpc-java:1.1.2" - } - } - } - generateProtoTasks { - all().each { task -> - task.plugins { - grpc {} - } - } - } -} diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java index 808e2a1f..7c334b8e 100644 --- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java +++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java @@ -20,7 +20,6 @@ import nu.marginalia.service.id.ServiceId; import java.util.List; import javax.annotation.CheckReturnValue; -import java.util.ServiceConfigurationError; import java.util.UUID; @Singleton @@ -39,7 +38,7 @@ public class IndexClient extends AbstractDynamicClient { super(descriptors.forId(ServiceId.Index), GsonFactory::get); this.messageQueueFactory = messageQueueFactory; - String inboxName = ServiceId.Index.name; + String inboxName = ServiceId.Index.serviceName; String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString()); outbox = messageQueueFactory.createOutbox(inboxName, nodeId, outboxName, nodeId, UUID.randomUUID()); setTimeout(30); diff --git a/code/common/config/src/main/java/nu/marginalia/storage/model/FileStorageId.java b/code/common/config/src/main/java/nu/marginalia/storage/model/FileStorageId.java index f31c7ab8..eb22f6f0 100644 --- a/code/common/config/src/main/java/nu/marginalia/storage/model/FileStorageId.java +++ b/code/common/config/src/main/java/nu/marginalia/storage/model/FileStorageId.java @@ -4,7 +4,7 @@ public record FileStorageId(long id) { public static FileStorageId parse(String str) { return new FileStorageId(Long.parseLong(str)); } - public static FileStorageId of(int storageId) { + public static FileStorageId of(long storageId) { return new FileStorageId(storageId); } diff --git a/code/common/service-client/build.gradle b/code/common/service-client/build.gradle index d34f362f..da1b1a71 100644 --- a/code/common/service-client/build.gradle +++ b/code/common/service-client/build.gradle @@ -24,6 +24,7 @@ dependencies { implementation libs.bundles.httpcomponents implementation libs.bundles.gson + implementation libs.bundles.grpc implementation libs.protobuf implementation libs.bundles.prometheus diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/ServiceMonitors.java b/code/common/service-client/src/main/java/nu/marginalia/client/ServiceMonitors.java index f4244112..b09ffa0c 100644 --- a/code/common/service-client/src/main/java/nu/marginalia/client/ServiceMonitors.java +++ b/code/common/service-client/src/main/java/nu/marginalia/client/ServiceMonitors.java @@ -114,7 +114,7 @@ public class ServiceMonitors { public boolean isServiceUp(ServiceId serviceId, int node) { synchronized (runningServices) { - return runningServices.contains(new ServiceNode(serviceId.name, node)); + return runningServices.contains(new ServiceNode(serviceId.serviceName, node)); } } diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGrpcStubPool.java b/code/common/service-client/src/main/java/nu/marginalia/client/grpc/GrpcStubPool.java similarity index 60% rename from code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGrpcStubPool.java rename to code/common/service-client/src/main/java/nu/marginalia/client/grpc/GrpcStubPool.java index ed95b18c..0692c002 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGrpcStubPool.java +++ b/code/common/service-client/src/main/java/nu/marginalia/client/grpc/GrpcStubPool.java @@ -1,8 +1,8 @@ -package nu.marginalia.query; +package nu.marginalia.client.grpc; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import nu.marginalia.query.svc.NodeConfigurationWatcher; +import nu.marginalia.service.id.ServiceId; import java.util.List; import java.util.Map; @@ -10,26 +10,34 @@ import java.util.concurrent.*; import java.util.function.Function; import java.util.stream.Stream; -public abstract class QueryGrpcStubPool { +/** A pool of gRPC stubs for a service, with a separate stub for each node. + * Manages broadcast-style request. */ +public abstract class GrpcStubPool { + public GrpcStubPool(String serviceName) { + + this.serviceName = serviceName; + } + protected record ServiceAndNode(String service, int node) { public String getHostName() { return service+"-"+node; } } - private final NodeConfigurationWatcher nodeConfigurationWatcher; private final Map channels = new ConcurrentHashMap<>(); - private final Map actorRpcApis = new ConcurrentHashMap<>(); + private final Map apis = new ConcurrentHashMap<>(); private final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor(); - QueryGrpcStubPool(NodeConfigurationWatcher nodeConfigurationWatcher) { - this.nodeConfigurationWatcher = nodeConfigurationWatcher; + private final String serviceName; + + public GrpcStubPool(ServiceId serviceId) { + this.serviceName = serviceId.serviceName; } /** Get an API stub for the given node */ - public STUB indexApi(int node) { - var san = new ServiceAndNode("index-service", node); - return actorRpcApis.computeIfAbsent(san, n -> + public STUB apiForNode(int node) { + var san = new ServiceAndNode(serviceName, node); + return apis.computeIfAbsent(san, n -> createStub(channels.computeIfAbsent(san, this::createChannel)) ); } @@ -41,8 +49,8 @@ public abstract class QueryGrpcStubPool { /** Invoke a function on each node, returning a list of futures in a terminal state, as per * ExecutorService$invokeAll */ public List> invokeAll(Function> callF) throws InterruptedException { - List> calls = nodeConfigurationWatcher.getQueryNodes().stream() - .map(id -> callF.apply(indexApi(id))) + List> calls = getEligibleNodes().stream() + .map(id -> callF.apply(apiForNode(id))) .toList(); return virtualExecutorService.invokeAll(calls); @@ -50,8 +58,8 @@ public abstract class QueryGrpcStubPool { /** Invoke a function on each node, returning a stream of results */ public Stream callEachSequential(Function call) { - return nodeConfigurationWatcher.getQueryNodes().stream() - .map(id -> call.apply(indexApi(id))); + return getEligibleNodes().stream() + .map(id -> call.apply(apiForNode(id))); } @@ -61,4 +69,7 @@ public abstract class QueryGrpcStubPool { */ public abstract STUB createStub(ManagedChannel channel); + /** Get the list of nodes that are eligible for broadcast-style requests */ + public abstract List getEligibleNodes(); + } diff --git a/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptor.java b/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptor.java index 29eac14d..68008bb3 100644 --- a/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptor.java +++ b/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptor.java @@ -8,7 +8,7 @@ public class ServiceDescriptor { public ServiceDescriptor(ServiceId id) { this.id = id; - this.name = id.name; + this.name = id.serviceName; } public ServiceDescriptor(ServiceId id, String host) { diff --git a/code/common/service-discovery/src/main/java/nu/marginalia/service/id/ServiceId.java b/code/common/service-discovery/src/main/java/nu/marginalia/service/id/ServiceId.java index 94aeb10b..26f52b6b 100644 --- a/code/common/service-discovery/src/main/java/nu/marginalia/service/id/ServiceId.java +++ b/code/common/service-discovery/src/main/java/nu/marginalia/service/id/ServiceId.java @@ -14,18 +14,19 @@ public enum ServiceId { Dating("dating-service"), Explorer("explorer-service"); - public final String name; - ServiceId(String name) { - this.name = name; + public final String serviceName; + + ServiceId(String serviceName) { + this.serviceName = serviceName; } public String withNode(int node) { - return name + ":" + node; + return serviceName + ":" + node; } public static ServiceId byName(String name) { for (ServiceId id : values()) { - if (id.name.equals(name)) { + if (id.serviceName.equals(name)) { return id; } } diff --git a/code/common/service/src/main/java/nu/marginalia/service/MainClass.java b/code/common/service/src/main/java/nu/marginalia/service/MainClass.java index 40303e48..be3080fb 100644 --- a/code/common/service/src/main/java/nu/marginalia/service/MainClass.java +++ b/code/common/service/src/main/java/nu/marginalia/service/MainClass.java @@ -51,10 +51,10 @@ public abstract class MainClass { protected static void init(ServiceId id, String... args) { System.setProperty("log4j2.isThreadContextMapInheritable", "true"); System.setProperty("isThreadContextMapInheritable", "true"); - System.setProperty("service-name", id.name); + System.setProperty("service-name", id.serviceName); ConfigLoader.loadConfig( - ConfigLoader.getConfigPath(id.name) + ConfigLoader.getConfigPath(id.serviceName) ); initJdbc(); diff --git a/code/common/service/src/main/java/nu/marginalia/service/module/ServiceConfiguration.java b/code/common/service/src/main/java/nu/marginalia/service/module/ServiceConfiguration.java index df97b7b0..6e2b3399 100644 --- a/code/common/service/src/main/java/nu/marginalia/service/module/ServiceConfiguration.java +++ b/code/common/service/src/main/java/nu/marginalia/service/module/ServiceConfiguration.java @@ -22,6 +22,6 @@ public record ServiceConfiguration(ServiceId serviceId, int metricsPort, UUID instanceUuid) { public String serviceName() { - return serviceId.name; + return serviceId.serviceName; } } diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/NodeStatusWatcher.java b/code/common/service/src/main/java/nu/marginalia/service/server/NodeStatusWatcher.java index b193b240..4de16cf0 100644 --- a/code/common/service/src/main/java/nu/marginalia/service/server/NodeStatusWatcher.java +++ b/code/common/service/src/main/java/nu/marginalia/service/server/NodeStatusWatcher.java @@ -1,7 +1,7 @@ package nu.marginalia.service.server; import com.google.inject.name.Named; -import jakarta.inject.Inject; +import com.google.inject.Inject; import lombok.SneakyThrows; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.nodecfg.NodeConfigurationService; diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/RedirectControl.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/RedirectControl.java index 10f33808..b23be402 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/RedirectControl.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/RedirectControl.java @@ -1,6 +1,6 @@ package nu.marginalia.control; -import jakarta.inject.Inject; +import com.google.inject.Inject; import spark.ResponseTransformer; import java.io.IOException; diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java index a707aefe..cf8acaf4 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java @@ -2,11 +2,9 @@ package nu.marginalia.control.node.svc; import com.google.inject.Inject; import com.google.inject.Singleton; -import nu.marginalia.client.Context; import nu.marginalia.control.ControlValidationError; import nu.marginalia.control.RedirectControl; import nu.marginalia.executor.client.ExecutorClient; -import nu.marginalia.executor.model.load.LoadParameters; import nu.marginalia.index.client.IndexClient; import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.storage.FileStorageService; @@ -106,7 +104,7 @@ public class ControlNodeActionsService { if (!Set.of("sample-s", "sample-m", "sample-l", "sample-xl").contains(set)) throw new ControlValidationError("Invalid sample specified", "A valid sample data set must be specified", ".."); - executorClient.downloadSampleData(Context.fromRequest(request), Integer.parseInt(request.params("node")), set); + executorClient.downloadSampleData(Integer.parseInt(request.params("node")), set); logger.info("Downloading sample data set {}", set); @@ -126,7 +124,7 @@ public class ControlNodeActionsService { eventLog.logEvent("USER-ACTION", "SIDELOAD ENCYCLOPEDIA " + nodeId); - executorClient.sideloadEncyclopedia(Context.fromRequest(request), nodeId, sourcePath, baseUrl); + executorClient.sideloadEncyclopedia(nodeId, sourcePath, baseUrl); return ""; } @@ -139,7 +137,7 @@ public class ControlNodeActionsService { eventLog.logEvent("USER-ACTION", "SIDELOAD DIRTREE " + nodeId); - executorClient.sideloadDirtree(Context.fromRequest(request), nodeId, sourcePath); + executorClient.sideloadDirtree(nodeId, sourcePath); return ""; } @@ -151,7 +149,7 @@ public class ControlNodeActionsService { eventLog.logEvent("USER-ACTION", "SIDELOAD WARC " + nodeId); - executorClient.sideloadWarc(Context.fromRequest(request), nodeId, sourcePath); + executorClient.sideloadWarc(nodeId, sourcePath); return ""; } @@ -166,7 +164,8 @@ public class ControlNodeActionsService { eventLog.logEvent("USER-ACTION", "SIDELOAD STACKEXCHANGE " + nodeId); - executorClient.sideloadStackexchange(Context.fromRequest(request), nodeId, sourcePath); + executorClient.sideloadStackexchange(nodeId, sourcePath); + return ""; } @@ -184,7 +183,6 @@ public class ControlNodeActionsService { changeActiveStorage(nodeId, FileStorageType.CRAWL_DATA, toCrawl); executorClient.triggerRecrawl( - Context.fromRequest(request), nodeId, toCrawl ); @@ -199,11 +197,7 @@ public class ControlNodeActionsService { changeActiveStorage(nodeId, FileStorageType.CRAWL_SPEC, toCrawl); - executorClient.triggerCrawl( - Context.fromRequest(request), - nodeId, - toCrawl - ); + executorClient.triggerCrawl(nodeId, toCrawl); return ""; } @@ -216,14 +210,10 @@ public class ControlNodeActionsService { changeActiveStorage(nodeId, FileStorageType.PROCESSED_DATA, toProcess); if (isAutoload) { - executorClient.triggerConvertAndLoad(Context.fromRequest(request), - nodeId, - toProcess); + executorClient.triggerConvertAndLoad(nodeId, toProcess); } else { - executorClient.triggerConvert(Context.fromRequest(request), - nodeId, - toProcess); + executorClient.triggerConvert(nodeId, toProcess); } return ""; @@ -241,10 +231,7 @@ public class ControlNodeActionsService { changeActiveStorage(nodeId, FileStorageType.PROCESSED_DATA, ids.toArray(new FileStorageId[0])); - executorClient.loadProcessedData(Context.fromRequest(request), - nodeId, - new LoadParameters(ids) - ); + executorClient.loadProcessedData(nodeId, ids); return ""; } @@ -254,7 +241,7 @@ public class ControlNodeActionsService { var toLoad = parseSourceFileStorageId(request.queryParams("source")); - executorClient.restoreBackup(Context.fromRequest(request), nodeId, toLoad); + executorClient.restoreBackup(nodeId, toLoad); return ""; } @@ -286,13 +273,13 @@ public class ControlNodeActionsService { throw new ControlValidationError("No url specified", "A url must be specified", ".."); } - executorClient.createCrawlSpecFromDownload(Context.fromRequest(request), nodeId, description, url); + executorClient.createCrawlSpecFromDownload(nodeId, description, url); return ""; } private Object exportDbData(Request req, Response rsp) { - executorClient.exportData(Context.fromRequest(req), Integer.parseInt(req.params("id"))); + executorClient.exportData(Integer.parseInt(req.params("id"))); return ""; } @@ -302,9 +289,9 @@ public class ControlNodeActionsService { FileStorageId source = parseSourceFileStorageId(req.queryParams("source")); switch (exportType) { - case "atags" -> executorClient.exportAtags(Context.fromRequest(req), Integer.parseInt(req.params("id")), source); - case "rss" -> executorClient.exportRssFeeds(Context.fromRequest(req), Integer.parseInt(req.params("id")), source); - case "termFreq" -> executorClient.exportTermFrequencies(Context.fromRequest(req), Integer.parseInt(req.params("id")), source); + case "atags" -> executorClient.exportAtags(Integer.parseInt(req.params("id")), source); + case "rss" -> executorClient.exportRssFeeds(Integer.parseInt(req.params("id")), source); + case "termFreq" -> executorClient.exportTermFrequencies(Integer.parseInt(req.params("id")), source); default -> throw new ControlValidationError("No export type specified", "An export type must be specified", ".."); } @@ -316,7 +303,7 @@ public class ControlNodeActionsService { int size = Integer.parseInt(req.queryParams("size")); String name = req.queryParams("name"); - executorClient.exportSampleData(Context.fromRequest(req), Integer.parseInt(req.params("id")), source, size, name); + executorClient.exportSampleData(Integer.parseInt(req.params("id")), source, size, name); return ""; } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java index 66a2b485..e4859eb3 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java @@ -107,13 +107,13 @@ public class ControlNodeService { } public Object startFsm(Request req, Response rsp) throws Exception { - executorClient.startFsm(Context.fromRequest(req), Integer.parseInt(req.params("id")), req.params("fsm").toUpperCase()); + executorClient.startFsm(Integer.parseInt(req.params("id")), req.params("fsm").toUpperCase()); return redirectToOverview(req); } public Object stopFsm(Request req, Response rsp) throws Exception { - executorClient.stopFsm(Context.fromRequest(req), Integer.parseInt(req.params("id")), req.params("fsm").toUpperCase()); + executorClient.stopFsm(Integer.parseInt(req.params("id")), req.params("fsm").toUpperCase()); return redirectToOverview(req); } @@ -132,7 +132,7 @@ public class ControlNodeService { int nodeId = Integer.parseInt(request.params("id")); String processBase = request.params("processBase"); - executorClient.stopProcess(Context.fromRequest(request), nodeId, processBase); + executorClient.stopProcess(nodeId, processBase); return ""; } @@ -153,7 +153,7 @@ public class ControlNodeService { return Map.of( "tab", Map.of("actors", true), "node", nodeConfigurationService.get(nodeId), - "actors", executorClient.getActorStates(Context.fromRequest(request), nodeId).states() + "actors", executorClient.getActorStates(nodeId).states() ); } @@ -164,7 +164,7 @@ public class ControlNodeService { "tab", Map.of("actions", true), "node", nodeConfigurationService.get(nodeId), "view", Map.of(request.queryParams("view"), true), - "uploadDirContents", executorClient.listSideloadDir(Context.fromRequest(request), nodeId), + "uploadDirContents", executorClient.listSideloadDir(nodeId), "allBackups", fileStorageService.getEachFileStorage(nodeId, FileStorageType.BACKUP), "allCrawlData", @@ -279,9 +279,9 @@ public class ControlNodeService { int nodeId = Integer.parseInt(request.params("id")); var config = nodeConfigurationService.get(nodeId); - var actors = executorClient.getActorStates(Context.fromRequest(request), nodeId).states(); - - actors.removeIf(actor -> actor.state().equals("MONITOR")); + var actors = executorClient.getActorStates(nodeId).states() + .stream().filter(actor -> !actor.state().equals("MONITOR")) + .toList(); return Map.of( "node", nodeConfigurationService.get(nodeId), @@ -308,7 +308,7 @@ public class ControlNodeService { } private List getEvents(int nodeId) { - List services = List.of(ServiceId.Index.name+":"+nodeId, ServiceId.Executor.name+":"+nodeId); + List services = List.of(ServiceId.Index.serviceName +":"+nodeId, ServiceId.Executor.serviceName +":"+nodeId); List events = new ArrayList<>(20); for (var service :services) { events.addAll(eventLogService.getLastEntriesForService(service, Long.MAX_VALUE, 10)); @@ -399,7 +399,7 @@ public class ControlNodeService { List files = new ArrayList<>(); - for (var execFile : executorClient.listFileStorage(context, node, fileId).files()) { + for (var execFile : executorClient.listFileStorage(node, fileId).files()) { files.add(new FileStorageFileModel( execFile.name(), execFile.modTime(), diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java index cfef6577..726491ec 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java @@ -2,7 +2,6 @@ package nu.marginalia.control.sys.svc; import com.google.inject.Inject; import lombok.SneakyThrows; -import nu.marginalia.client.Context; import nu.marginalia.control.ControlRendererFactory; import nu.marginalia.control.Redirects; import nu.marginalia.control.actor.ControlActor; @@ -56,7 +55,7 @@ public class ControlSysActionsService { * and lacks a proper internal API */ private MqOutbox createApiOutbox(MessageQueueFactory mqFactory) { - String inboxName = ServiceId.Api.name + ":" + "0"; + String inboxName = ServiceId.Api.serviceName + ":" + "0"; String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString()); return mqFactory.createOutbox(inboxName, 0, outboxName, 0, UUID.randomUUID()); } @@ -119,7 +118,7 @@ public class ControlSysActionsService { // This is technically not a partitioned operation, but we execute it at node 1 // and let the effects be global :-) - executorClient.calculateAdjacencies(Context.fromRequest(request), 1); + executorClient.calculateAdjacencies(1); return ""; } diff --git a/code/services-core/executor-service/build.gradle b/code/services-core/executor-service/build.gradle index 26f97808..e903d048 100644 --- a/code/services-core/executor-service/build.gradle +++ b/code/services-core/executor-service/build.gradle @@ -51,6 +51,7 @@ dependencies { implementation libs.bundles.slf4j implementation libs.spark + implementation libs.bundles.grpc implementation libs.gson implementation libs.prometheus implementation libs.notnull diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorApi.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorApi.java index 2e570f05..13923302 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorApi.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorApi.java @@ -2,17 +2,21 @@ package nu.marginalia.actor; import com.google.inject.Inject; import com.google.inject.Singleton; -import nu.marginalia.actor.proc.ProcessLivenessMonitorActor; +import nu.marginalia.actor.state.ActorStateInstance; +import nu.marginalia.executor.api.RpcActorRunState; +import nu.marginalia.executor.api.RpcActorRunStates; +import nu.marginalia.executor.api.RpcFsmName; +import nu.marginalia.executor.api.RpcProcessId; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.process.ProcessService; import nu.marginalia.service.module.ServiceConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import spark.Request; -import spark.Response; import spark.Spark; +import java.util.Comparator; + @Singleton public class ActorApi { private final ExecutorActorControlService actors; @@ -32,33 +36,20 @@ public class ActorApi { this.serviceConfiguration = serviceConfiguration; } - public Object startActorFromState(Request request, Response response) throws Exception { - ExecutorActor actor = translateActor(request.params("id")); - String state = request.params("state"); - actors.startFromJSON(actor, state, request.body()); - - return ""; - } - - public Object startActor(Request request, Response response) throws Exception { - ExecutorActor actor = translateActor(request.params("id")); + public void startActor(RpcFsmName actorName) throws Exception { + ExecutorActor actor = translateActor(actorName.getActorName()); actors.start(actor); - - return ""; } - public Object stopActor(Request request, Response response) { - ExecutorActor actor = translateActor(request.params("id")); - + public void stopActor(RpcFsmName actorName) { + ExecutorActor actor = translateActor(actorName.getActorName()); actors.stop(actor); - - return "OK"; } - public Object stopProcess(Request request, Response response) { - ProcessService.ProcessId id = ProcessService.translateExternalIdBase(request.params("id")); + public Object stopProcess(RpcProcessId processId) { + ProcessService.ProcessId id = ProcessService.translateExternalIdBase(processId.getProcessId()); try { String inbox = id.name().toLowerCase() + ":" + serviceConfiguration.node(); @@ -84,6 +75,45 @@ public class ActorApi { return "OK"; } + + public RpcActorRunStates getActorStates() { + var items = actors.getActorStates().entrySet().stream().map(e -> { + final var stateGraph = actors.getActorDefinition(e.getKey()); + + final ActorStateInstance state = e.getValue(); + final String actorDescription = stateGraph.describe(); + + final String machineName = e.getKey().name(); + final String stateName = state.name(); + + final String stateDescription = ""; + + final boolean terminal = state.isFinal(); + final boolean canStart = actors.isDirectlyInitializable(e.getKey()) && terminal; + + return RpcActorRunState + .newBuilder() + .setActorName(machineName) + .setState(stateName) + .setActorDescription(actorDescription) + .setStateDescription(stateDescription) + .setTerminal(terminal) + .setCanStart(canStart) + .build(); + + }) + .filter(s -> !s.getTerminal() || s.getCanStart()) + .sorted(Comparator.comparing(RpcActorRunState::getActorName)) + .toList(); + + return RpcActorRunStates.newBuilder() + .setNode(serviceConfiguration.node()) + .addAllActorRunStates(items) + .build(); + + } + + public ExecutorActor translateActor(String name) { try { return ExecutorActor.valueOf(name.toUpperCase()); diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ExecutorActorStateMachines.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ExecutorActorStateMachines.java index 8d456822..4e1804df 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ExecutorActorStateMachines.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ExecutorActorStateMachines.java @@ -1,7 +1,7 @@ package nu.marginalia.actor; import com.google.gson.Gson; -import jakarta.inject.Singleton; +import com.google.inject.Singleton; import nu.marginalia.actor.state.ActorStateInstance; import nu.marginalia.actor.state.ActorStep; import nu.marginalia.model.gson.GsonFactory; diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java new file mode 100644 index 00000000..0816c51b --- /dev/null +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java @@ -0,0 +1,313 @@ +package nu.marginalia.executor; + +import com.google.inject.Inject; +import io.grpc.stub.StreamObserver; +import nu.marginalia.actor.ActorApi; +import nu.marginalia.executor.api.*; +import nu.marginalia.executor.svc.*; + +public class ExecutorGrpcService extends ExecutorApiGrpc.ExecutorApiImplBase { + private final ActorApi actorApi; + private final ExportService exportService; + private final SideloadService sideloadService; + private final BackupService backupService; + private final TransferService transferService; + private final ProcessingService processingService; + + @Inject + public ExecutorGrpcService(ActorApi actorApi, + ExportService exportService, + SideloadService sideloadService, + BackupService backupService, + TransferService transferService, + ProcessingService processingService) + { + this.actorApi = actorApi; + this.exportService = exportService; + this.sideloadService = sideloadService; + this.backupService = backupService; + this.transferService = transferService; + this.processingService = processingService; + } + + @Override + public void startFsm(RpcFsmName request, StreamObserver responseObserver) { + try { + actorApi.startActor(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void stopFsm(RpcFsmName request, StreamObserver responseObserver) { + try { + actorApi.stopActor(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void stopProcess(RpcProcessId request, StreamObserver responseObserver) { + try { + actorApi.stopProcess(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void triggerCrawl(RpcFileStorageId request, StreamObserver responseObserver) { + try { + processingService.startCrawl(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void triggerRecrawl(RpcFileStorageId request, StreamObserver responseObserver) { + try { + processingService.startRecrawl(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void triggerConvert(RpcFileStorageId request, StreamObserver responseObserver) { + try { + processingService.startConversion(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void triggerConvertAndLoad(RpcFileStorageId request, StreamObserver responseObserver) { + try { + processingService.startConvertLoad(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void loadProcessedData(RpcFileStorageIds request, StreamObserver responseObserver) { + try { + processingService.startLoad(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void calculateAdjacencies(Empty request, StreamObserver responseObserver) { + try { + processingService.startAdjacencyCalculation(); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void sideloadEncyclopedia(RpcSideloadEncyclopedia request, StreamObserver responseObserver) { + try { + sideloadService.sideloadEncyclopedia(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void sideloadDirtree(RpcSideloadDirtree request, StreamObserver responseObserver) { + try { + sideloadService.sideloadDirtree(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void sideloadWarc(RpcSideloadWarc request, StreamObserver responseObserver) { + try { + sideloadService.sideloadWarc(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void sideloadStackexchange(RpcSideloadStackexchange request, StreamObserver responseObserver) { + try { + sideloadService.sideloadStackexchange(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void createCrawlSpecFromDownload(RpcCrawlSpecFromDownload request, StreamObserver responseObserver) { + try { + processingService.createCrawlSpecFromDownload(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void exportAtags(RpcFileStorageId request, StreamObserver responseObserver) { + try { + exportService.exportAtags(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void exportSampleData(RpcExportSampleData request, StreamObserver responseObserver) { + try { + exportService.exportSampleData(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void exportRssFeeds(RpcFileStorageId request, StreamObserver responseObserver) { + try { + exportService.exportFeeds(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void exportTermFrequencies(RpcFileStorageId request, StreamObserver responseObserver) { + try { + exportService.exportTermFrequencies(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void downloadSampleData(RpcDownloadSampleData request, StreamObserver responseObserver) { + try { + sideloadService.downloadSampleData(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void exportData(Empty request, StreamObserver responseObserver) { + try { + exportService.exportData(); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + + } + + @Override + public void restoreBackup(RpcFileStorageId request, StreamObserver responseObserver) { + try { + backupService.restore(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void getActorStates(Empty request, StreamObserver responseObserver) { + responseObserver.onNext(actorApi.getActorStates()); + responseObserver.onCompleted(); + } + + @Override + public void listSideloadDir(Empty request, StreamObserver responseObserver) { + try { + responseObserver.onNext(sideloadService.listUploadDir()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void listFileStorage(RpcFileStorageId request, StreamObserver responseObserver) { + try { + responseObserver.onNext(transferService.listFiles(request)); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } +} diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java index 613ca18f..ac567467 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java @@ -2,27 +2,19 @@ package nu.marginalia.executor; import com.google.gson.Gson; import com.google.inject.Inject; +import io.grpc.ServerBuilder; import nu.marginalia.actor.ExecutorActor; -import nu.marginalia.actor.ActorApi; import nu.marginalia.actor.ExecutorActorControlService; -import nu.marginalia.actor.state.ActorStateInstance; -import nu.marginalia.executor.model.ActorRunState; -import nu.marginalia.executor.model.ActorRunStates; -import nu.marginalia.executor.svc.*; +import nu.marginalia.executor.svc.TransferService; import nu.marginalia.service.server.BaseServiceParams; import nu.marginalia.service.server.Service; import nu.marginalia.service.server.mq.MqRequest; -import nu.marginalia.storage.FileStorageService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import spark.Request; -import spark.Response; import spark.Spark; import java.io.IOException; import java.sql.SQLException; -import java.util.Comparator; -import java.util.concurrent.ConcurrentHashMap; // Weird name for this one to not have clashes with java.util.concurrent.ExecutorService public class ExecutorSvc extends Service { @@ -36,51 +28,19 @@ public class ExecutorSvc extends Service { @Inject public ExecutorSvc(BaseServiceParams params, ExecutorActorControlService actorControlService, - ProcessingService processingService, - SideloadService sideloadService, - BackupService backupService, - ExportService exportService, + ExecutorGrpcService executorGrpcService, Gson gson, - TransferService transferService, - ActorApi actorApi) { + TransferService transferService) throws IOException { super(params); this.params = params; this.gson = gson; this.actorControlService = actorControlService; this.transferService = transferService; - Spark.post("/actor/:id/start", actorApi::startActor); - Spark.post("/actor/:id/start/:state", actorApi::startActorFromState); - Spark.post("/actor/:id/stop", actorApi::stopActor); - - Spark.get("/actor", this::getActorStates, gson::toJson); - - Spark.post("/process/:id/stop", actorApi::stopProcess); - - Spark.post("/process/crawl/:fid", processingService::startCrawl); - Spark.post("/process/recrawl", processingService::startRecrawl); - Spark.post("/process/convert/:fid", processingService::startConversion); - Spark.post("/process/convert-load/:fid", processingService::startConvertLoad); - Spark.post("/process/crawl-spec/from-download", processingService::createCrawlSpecFromDownload); - Spark.post("/process/load", processingService::startLoad); - Spark.post("/process/adjacency-calculation", processingService::startAdjacencyCalculation); - - Spark.get("/sideload/", sideloadService::listUploadDir, gson::toJson); - Spark.post("/sideload/dirtree", sideloadService::sideloadDirtree); - Spark.post("/sideload/warc", sideloadService::sideloadWarc); - Spark.post("/sideload/stackexchange", sideloadService::sideloadStackexchange); - Spark.post("/sideload/encyclopedia", sideloadService::sideloadEncyclopedia); - - Spark.post("/action/download-sample-data", sideloadService::downloadSampleData); - - Spark.post("/export/atags", exportService::exportAtags); - Spark.post("/export/sample-data", exportService::exportSampleData); - Spark.post("/export/feeds", exportService::exportFeeds); - Spark.post("/export/termfreq", exportService::exportTermFrequencies); - Spark.post("/export/data", exportService::exportData); - - Spark.post("/backup/:fid/restore", backupService::restore); - Spark.get("/storage/:fid", transferService::listFiles, gson::toJson); + var grpcServer = ServerBuilder.forPort(params.configuration.port() + 1) + .addService(executorGrpcService) + .build(); + grpcServer.start(); Spark.get("/transfer/file/:fid", transferService::transferFile); @@ -123,37 +83,4 @@ public class ExecutorSvc extends Service { return "OK"; } - - private final ConcurrentHashMap actorStateDescriptions = new ConcurrentHashMap<>(); - - private ActorRunStates getActorStates(Request request, Response response) { - var items = actorControlService.getActorStates().entrySet().stream().map(e -> { - final var stateGraph = actorControlService.getActorDefinition(e.getKey()); - - final ActorStateInstance state = e.getValue(); - final String actorDescription = stateGraph.describe(); - - final String machineName = e.getKey().name(); - final String stateName = state.name(); - - final String stateDescription = ""; - - final boolean terminal = state.isFinal(); - final boolean canStart = actorControlService.isDirectlyInitializable(e.getKey()) && terminal; - - return new ActorRunState(machineName, - stateName, - actorDescription, - stateDescription, - terminal, - canStart); - }) - .filter(s -> !s.terminal() || s.canStart()) - .sorted(Comparator.comparing(ActorRunState::name)) - .toList(); - - return new ActorRunStates(params.configuration.node(), items); - } - - } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/BackupService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/BackupService.java index 05acceb0..45f8c622 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/BackupService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/BackupService.java @@ -4,9 +4,8 @@ import com.google.inject.Inject; import nu.marginalia.actor.ExecutorActor; import nu.marginalia.actor.ExecutorActorControlService; import nu.marginalia.actor.task.RestoreBackupActor; +import nu.marginalia.executor.api.RpcFileStorageId; import nu.marginalia.storage.model.FileStorageId; -import spark.Request; -import spark.Response; public class BackupService { private final ExecutorActorControlService actorControlService; @@ -16,9 +15,8 @@ public class BackupService { this.actorControlService = actorControlService; } - public Object restore(Request request, Response response) throws Exception { - var fid = FileStorageId.parse(request.params("fid")); + public void restore(RpcFileStorageId request) throws Exception { + var fid = FileStorageId.of(request.getFileStorageId()); actorControlService.startFrom(ExecutorActor.RESTORE_BACKUP, new RestoreBackupActor.Restore(fid)); - return ""; } } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ExportService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ExportService.java index 388af5b5..9f941ab9 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ExportService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ExportService.java @@ -3,13 +3,10 @@ package nu.marginalia.executor.svc; import com.google.inject.Inject; import nu.marginalia.actor.ExecutorActor; import nu.marginalia.actor.ExecutorActorControlService; -import nu.marginalia.actor.task.ConvertActor; -import nu.marginalia.actor.task.ExportAtagsActor; -import nu.marginalia.actor.task.ExportDataActor; -import nu.marginalia.actor.task.ExportSampleDataActor; +import nu.marginalia.actor.task.*; +import nu.marginalia.executor.api.RpcExportSampleData; +import nu.marginalia.executor.api.RpcFileStorageId; import nu.marginalia.storage.model.FileStorageId; -import spark.Request; -import spark.Response; public class ExportService { private final ExecutorActorControlService actorControlService; @@ -19,33 +16,36 @@ public class ExportService { this.actorControlService = actorControlService; } - public Object exportData(Request request, Response response) throws Exception { + public void exportData() throws Exception { actorControlService.startFrom(ExecutorActor.EXPORT_DATA, new ExportDataActor.Export()); - return ""; } - public Object exportSampleData(Request request, Response response) throws Exception { - actorControlService.startFrom(ExecutorActor.EXPORT_SAMPLE_DATA, new ExportSampleDataActor.Export( - FileStorageId.parse(request.queryParams("fid")), - Integer.parseInt(request.queryParams("size")), - request.queryParams("name") - )); - return ""; + public void exportSampleData(RpcExportSampleData request) throws Exception { + actorControlService.startFrom(ExecutorActor.EXPORT_SAMPLE_DATA, + new ExportSampleDataActor.Export( + FileStorageId.of(request.getFileStorageId()), + request.getSize(), + request.getName() + ) + ); } - public Object exportAtags(Request request, Response response) throws Exception { - actorControlService.startFrom(ExecutorActor.EXPORT_ATAGS, new ExportAtagsActor.Export(FileStorageId.parse(request.queryParams("fid")))); - return ""; + public void exportAtags(RpcFileStorageId request) throws Exception { + actorControlService.startFrom(ExecutorActor.EXPORT_ATAGS, + new ExportAtagsActor.Export(FileStorageId.of(request.getFileStorageId())) + ); } - public Object exportFeeds(Request request, Response response) throws Exception { - actorControlService.startFrom(ExecutorActor.EXPORT_FEEDS, new ExportAtagsActor.Export(FileStorageId.parse(request.queryParams("fid")))); - return ""; - } - public Object exportTermFrequencies(Request request, Response response) throws Exception { - actorControlService.startFrom(ExecutorActor.EXPORT_TERM_FREQUENCIES, new ExportAtagsActor.Export(FileStorageId.parse(request.queryParams("fid")))); - return ""; + public void exportFeeds(RpcFileStorageId request) throws Exception { + actorControlService.startFrom(ExecutorActor.EXPORT_FEEDS, + new ExportFeedsActor.Export(FileStorageId.of(request.getFileStorageId())) + ); } + public void exportTermFrequencies(RpcFileStorageId request) throws Exception { + actorControlService.startFrom(ExecutorActor.EXPORT_TERM_FREQUENCIES, + new ExportTermFreqActor.Export(FileStorageId.of(request.getFileStorageId())) + ); + } } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ProcessingService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ProcessingService.java index f8d83d98..bee5c2c1 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ProcessingService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ProcessingService.java @@ -1,84 +1,62 @@ package nu.marginalia.executor.svc; -import com.google.gson.Gson; import com.google.inject.Inject; import nu.marginalia.actor.ExecutorActor; import nu.marginalia.actor.ExecutorActorControlService; import nu.marginalia.actor.task.*; +import nu.marginalia.executor.api.RpcCrawlSpecFromDownload; +import nu.marginalia.executor.api.RpcFileStorageId; +import nu.marginalia.executor.api.RpcFileStorageIds; import nu.marginalia.storage.model.FileStorageId; -import nu.marginalia.executor.model.load.LoadParameters; -import spark.Request; -import spark.Response; + +import java.util.stream.Collectors; public class ProcessingService { private final ExecutorActorControlService actorControlService; - private final Gson gson; @Inject - public ProcessingService(ExecutorActorControlService actorControlService, - Gson gson) { + public ProcessingService(ExecutorActorControlService actorControlService) { this.actorControlService = actorControlService; - this.gson = gson; } - public Object startRecrawl(Request request, Response response) throws Exception { - var crawlId = gson.fromJson(request.body(), FileStorageId.class); - - actorControlService.startFrom( - ExecutorActor.RECRAWL, - new RecrawlActor.Initial(crawlId, false) - ); - - return ""; + public void startRecrawl(RpcFileStorageId request) throws Exception { + actorControlService.startFrom(ExecutorActor.RECRAWL, + new RecrawlActor.Initial(FileStorageId.of(request.getFileStorageId()), false)); } - public Object startCrawl(Request request, Response response) throws Exception { + public void startCrawl(RpcFileStorageId request) throws Exception { actorControlService.startFrom(ExecutorActor.CRAWL, - new CrawlActor.Initial(FileStorageId.parse(request.params("fid")))); - - return ""; + new CrawlActor.Initial(FileStorageId.of(request.getFileStorageId()))); } - public Object startConversion(Request request, Response response) throws Exception { + public void startConversion(RpcFileStorageId request) throws Exception { actorControlService.startFrom(ExecutorActor.CONVERT, - new ConvertActor.Convert(FileStorageId.parse(request.params("fid")))); - - return ""; + new ConvertActor.Convert(FileStorageId.of(request.getFileStorageId()))); } - public Object startConvertLoad(Request request, Response response) throws Exception { - actorControlService.startFrom( - ExecutorActor.CONVERT_AND_LOAD, - new ConvertAndLoadActor.Initial(FileStorageId.parse(request.params("fid"))) + public void startConvertLoad(RpcFileStorageId request) throws Exception { + actorControlService.startFrom(ExecutorActor.CONVERT_AND_LOAD, + new ConvertAndLoadActor.Initial(FileStorageId.of(request.getFileStorageId()))); + } + + public void startLoad(RpcFileStorageIds request) throws Exception { + actorControlService.startFrom(ExecutorActor.CONVERT_AND_LOAD, + new ConvertAndLoadActor.Load(request.getFileStorageIdsList() + .stream() + .map(FileStorageId::of) + .collect(Collectors.toList())) ); - - return ""; } - - public Object startLoad(Request request, Response response) throws Exception { - var params = gson.fromJson(request.body(), LoadParameters.class); - - // Start the FSM from the intermediate state that triggers the load - actorControlService.startFrom( - ExecutorActor.CONVERT_AND_LOAD, - new ConvertAndLoadActor.Load(params.ids()) - ); - - return ""; - } - - public Object startAdjacencyCalculation(Request request, Response response) throws Exception { + public void startAdjacencyCalculation() throws Exception { actorControlService.startFrom(ExecutorActor.ADJACENCY_CALCULATION, new TriggerAdjacencyCalculationActor.Run()); - return ""; } - public Object createCrawlSpecFromDownload(Request request, Response response) throws Exception { + public void createCrawlSpecFromDownload(RpcCrawlSpecFromDownload request) throws Exception { actorControlService.startFrom(ExecutorActor.CRAWL_JOB_EXTRACTOR, new CrawlJobExtractorActor.CreateFromUrl( - request.queryParamOrDefault("description", ""), - request.queryParamOrDefault("url", "")) + request.getDescription(), + request.getUrl()) ); - return ""; } } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java index 8ef39c12..7daefe6e 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java @@ -6,18 +6,16 @@ import nu.marginalia.actor.ExecutorActor; import nu.marginalia.actor.ExecutorActorControlService; import nu.marginalia.actor.task.ConvertActor; import nu.marginalia.actor.task.DownloadSampleActor; -import nu.marginalia.executor.upload.UploadDirContents; -import nu.marginalia.executor.upload.UploadDirItem; +import nu.marginalia.executor.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import spark.Request; -import spark.Response; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; public class SideloadService { private final ExecutorActorControlService actorControlService; @@ -28,43 +26,61 @@ public class SideloadService { this.actorControlService = actorControlService; } - public Object sideloadDirtree(Request request, Response response) throws Exception { - actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertDirtree(request.queryParams("path"))); - return ""; + public void sideloadDirtree(RpcSideloadDirtree request) throws Exception { + actorControlService.startFrom(ExecutorActor.CONVERT, + new ConvertActor.ConvertDirtree(request.getSourcePath()) + ); } - public Object sideloadWarc(Request request, Response response) throws Exception { - actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertWarc(request.queryParams("path"))); - return ""; + + public void sideloadWarc(RpcSideloadWarc request) throws Exception { + actorControlService.startFrom(ExecutorActor.CONVERT, + new ConvertActor.ConvertWarc(request.getSourcePath()) + ); } - public Object sideloadEncyclopedia(Request request, Response response) throws Exception { + + public void sideloadEncyclopedia(RpcSideloadEncyclopedia request) throws Exception { actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertEncyclopedia( - request.queryParams("path"), - request.queryParams("baseUrl") + request.getSourcePath(), + request.getBaseUrl() )); - return ""; } - public Object sideloadStackexchange(Request request, Response response) throws Exception { - actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertStackexchange(request.queryParams("path"))); - return ""; + public void sideloadStackexchange(RpcSideloadStackexchange request) throws Exception { + actorControlService.startFrom(ExecutorActor.CONVERT, + new ConvertActor.ConvertStackexchange(request.getSourcePath()) + ); } - public UploadDirContents listUploadDir(Request request, Response response) throws IOException { + public RpcUploadDirContents listUploadDir() throws IOException { Path uploadDir = WmsaHome.getUploadDir(); try (var items = Files.list(uploadDir)) { - return new UploadDirContents(uploadDir.toString(), - items.map(UploadDirItem::fromPath).toList()); - } + var builder = RpcUploadDirContents.newBuilder().setPath(uploadDir.toString()); + var iter = items.iterator(); + while (iter.hasNext()) { + var path = iter.next(); + + boolean isDir = Files.isDirectory(path); + long size = isDir ? 0 : Files.size(path); + var mtime = Files.getLastModifiedTime(path); + + builder.addEntriesBuilder() + .setName(path.toString()) + .setIsDirectory(isDir) + .setLastModifiedTime( + LocalDateTime.ofInstant(mtime.toInstant(), ZoneId.systemDefault()).format(DateTimeFormatter.ISO_DATE_TIME)) + .setSize(size); + } + + return builder.build(); + } } - public Object downloadSampleData(Request request, Response response) throws Exception { - String sampleSet = request.queryParams("set"); + public void downloadSampleData(RpcDownloadSampleData request) throws Exception { + String sampleSet = request.getSampleSet(); actorControlService.startFrom(ExecutorActor.DOWNLOAD_SAMPLE, new DownloadSampleActor.Run(sampleSet)); - - return ""; } } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java index ed84695f..377bd354 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java @@ -5,11 +5,12 @@ import com.google.inject.Inject; import com.zaxxer.hikari.HikariDataSource; import lombok.SneakyThrows; import nu.marginalia.client.Context; +import nu.marginalia.executor.api.RpcFileStorageContent; +import nu.marginalia.executor.api.RpcFileStorageEntry; +import nu.marginalia.executor.api.RpcFileStorageId; import nu.marginalia.executor.client.ExecutorClient; import nu.marginalia.executor.model.transfer.TransferItem; import nu.marginalia.executor.model.transfer.TransferSpec; -import nu.marginalia.executor.storage.FileStorageContent; -import nu.marginalia.executor.storage.FileStorageFile; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.process.log.WorkLog; @@ -74,30 +75,31 @@ public class TransferService { } - public FileStorageContent listFiles(Request request, Response response) throws SQLException, IOException { - FileStorageId fileStorageId = FileStorageId.parse(request.params("fid")); + public RpcFileStorageContent listFiles(RpcFileStorageId request) throws SQLException, IOException { + FileStorageId fileStorageId = FileStorageId.of(request.getFileStorageId()); var storage = fileStorageService.getStorage(fileStorageId); - List files; + var builder = RpcFileStorageContent.newBuilder(); + try (var fs = Files.list(storage.asPath())) { - files = fs.filter(Files::isRegularFile) + fs.filter(Files::isRegularFile) .map(this::createFileModel) - .sorted(Comparator.comparing(FileStorageFile::name)) - .toList(); + .sorted(Comparator.comparing(RpcFileStorageEntry::getName)) + .forEach(builder::addEntries); } - return new FileStorageContent(files); + return builder.build(); } @SneakyThrows - private FileStorageFile createFileModel(Path path) { - return new FileStorageFile( - path.toFile().getName(), - Files.size(path), - Files.getLastModifiedTime(path).toInstant().toString() - ); + private RpcFileStorageEntry createFileModel(Path path) { + return RpcFileStorageEntry.newBuilder() + .setName(path.toFile().getName()) + .setSize(Files.size(path)) + .setLastModifiedTime(Files.getLastModifiedTime(path).toInstant().toString()) + .build(); } public TransferSpec getTransferSpec(Request request, Response response) throws SQLException { diff --git a/code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java b/code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java deleted file mode 100644 index 29fdf8e3..00000000 --- a/code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java +++ /dev/null @@ -1,155 +0,0 @@ -package nu.marginalia.executor; - -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Inject; -import com.google.inject.Provides; -import nu.marginalia.actor.ExecutorActor; -import nu.marginalia.actor.ExecutorActorControlService; -import nu.marginalia.actor.task.TriggerAdjacencyCalculationActor; -import nu.marginalia.client.Context; -import nu.marginalia.client.route.RouteProvider; -import nu.marginalia.process.ProcessOutboxes; -import nu.marginalia.process.ProcessService; -import nu.marginalia.storage.FileStorageService; -import nu.marginalia.storage.model.FileStorageId; -import nu.marginalia.executor.client.ExecutorClient; -import nu.marginalia.index.client.IndexClient; -import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mq.inbox.MqAsynchronousInbox; -import nu.marginalia.service.control.ServiceEventLog; -import nu.marginalia.service.control.ServiceHeartbeatImpl; -import nu.marginalia.service.descriptor.ServiceDescriptor; -import nu.marginalia.service.descriptor.ServiceDescriptors; -import nu.marginalia.service.id.ServiceId; -import nu.marginalia.service.module.ServiceConfiguration; -import nu.marginalia.service.server.Initialization; -import nu.marginalia.service.server.MetricsServer; -import org.junit.jupiter.api.*; -import org.mockito.Mockito; -import spark.Spark; - -import java.nio.file.Path; -import java.util.List; -import java.util.UUID; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; - -@Tag("slow") -public class ExecutorSvcApiIntegrationTest { - - static TestInstances testInstances; - static final int port = 9999; - - @BeforeAll - public static void setUpAll() { - RouteProvider.setDefaultPort(port); - var injector = Guice.createInjector(new TestModule()); - testInstances = injector.getInstance(TestInstances.class); - injector.getInstance(Initialization.class).setReady(); - } - - @AfterAll - public static void tearDownAll() { - RouteProvider.resetDefaultPort(); - } - - @BeforeEach - public void setUp() { - Mockito.reset(testInstances.actorControlService); - } - - @AfterAll - public static void tearDown() { - Spark.stop(); - } - - @Test - public void startStartActor() throws Exception { - testInstances.client.startFsm(Context.internal(), 0, "crawl"); - Mockito.verify(testInstances.actorControlService).start(ExecutorActor.CRAWL); - } - - @Test - public void startStopActor() { - testInstances.client.stopFsm(Context.internal(), 0, "crawl"); - - Mockito.verify(testInstances.actorControlService).stop(ExecutorActor.CRAWL); - } - - @Test - public void triggerCrawl() throws Exception { - testInstances.client.triggerCrawl(Context.internal(), 0, FileStorageId.of(1)); - - Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CRAWL), any()); - } - - @Test - public void triggerRecrawl() throws Exception { - testInstances.client.triggerRecrawl(Context.internal(), 0, - new FileStorageId(0)); - - Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.RECRAWL), any()); - } - - - @Test - public void triggerProcessAndLoad() throws Exception { - testInstances.client.triggerConvertAndLoad(Context.internal(), 0, FileStorageId.of(1)); - - Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT_AND_LOAD), any()); - } - - @Test - public void calculateAdjacencies() throws Exception { - testInstances.client.calculateAdjacencies(Context.internal(), 0); - - Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.ADJACENCY_CALCULATION), eq(new TriggerAdjacencyCalculationActor.Run())); - } - -} - -class TestInstances { - @Inject - ExecutorSvc service; - @Inject - ExecutorClient client; - @Inject - ExecutorActorControlService actorControlService; -} -class TestModule extends AbstractModule { - - @Override - public void configure() { - System.setProperty("service-name", "test"); - bind(ExecutorActorControlService.class).toInstance(Mockito.mock(ExecutorActorControlService.class)); - bind(FileStorageService.class).toInstance(Mockito.mock(FileStorageService.class)); - bind(ProcessService.class).toInstance(Mockito.mock(ProcessService.class)); - bind(ServiceEventLog.class).toInstance(Mockito.mock(ServiceEventLog.class)); - bind(ServiceHeartbeatImpl.class).toInstance(Mockito.mock(ServiceHeartbeatImpl.class)); - bind(MetricsServer.class).toInstance(Mockito.mock(MetricsServer.class)); - bind(IndexClient.class).toInstance(Mockito.mock(IndexClient.class)); - bind(ProcessOutboxes.class).toInstance(Mockito.mock(ProcessOutboxes.class)); - bind(ServiceConfiguration.class) - .toInstance(new ServiceConfiguration(ServiceId.Executor, - 0, "127.0.0.1", ExecutorSvcApiIntegrationTest.port, -1, UUID.randomUUID())); - } - - @Provides - public ServiceDescriptors getServiceDescriptors() { - return new ServiceDescriptors( - List.of(new ServiceDescriptor(ServiceId.Executor, "127.0.0.1")) - ); - } - - @Provides - public MessageQueueFactory getMessageQueueFactory() { - var mock = Mockito.mock(MessageQueueFactory.class); - - Mockito.when(mock.createAsynchronousInbox(Mockito.anyString(), Mockito.anyInt(), any())). - thenReturn(Mockito.mock(MqAsynchronousInbox.class)); - - return mock; - } -} \ No newline at end of file diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/svc/NodeConfigurationWatcher.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/NodeConfigurationWatcher.java similarity index 95% rename from code/services-core/query-service/src/main/java/nu/marginalia/query/svc/NodeConfigurationWatcher.java rename to code/services-core/query-service/src/main/java/nu/marginalia/query/NodeConfigurationWatcher.java index f26da0f8..a7117387 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/svc/NodeConfigurationWatcher.java +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/NodeConfigurationWatcher.java @@ -1,4 +1,4 @@ -package nu.marginalia.query.svc; +package nu.marginalia.query; import com.google.inject.Inject; import lombok.SneakyThrows; @@ -6,7 +6,6 @@ import nu.marginalia.nodecfg.NodeConfigurationService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryBasicInterface.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryBasicInterface.java index 363b493f..15da22bd 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryBasicInterface.java +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryBasicInterface.java @@ -4,11 +4,9 @@ import com.google.gson.Gson; import com.google.inject.Inject; import nu.marginalia.client.Context; import nu.marginalia.index.client.IndexClient; -import nu.marginalia.index.client.model.query.SearchSetIdentifier; import nu.marginalia.index.query.limit.QueryLimits; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.query.model.QueryParams; -import nu.marginalia.query.svc.NodeConfigurationWatcher; import nu.marginalia.renderer.MustacheRenderer; import nu.marginalia.renderer.RendererFactory; import nu.marginalia.query.svc.QueryFactory; diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCDomainLinksService.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCDomainLinksService.java index 78cfc637..2eb4c01c 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCDomainLinksService.java +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCDomainLinksService.java @@ -3,28 +3,34 @@ package nu.marginalia.query; import com.google.inject.Inject; import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; +import nu.marginalia.client.grpc.GrpcStubPool; import nu.marginalia.index.api.IndexDomainLinksApiGrpc; import nu.marginalia.index.api.RpcDomainIdCount; import nu.marginalia.index.api.RpcDomainIdList; import nu.marginalia.index.api.RpcDomainIdPairs; -import nu.marginalia.query.svc.NodeConfigurationWatcher; +import nu.marginalia.service.id.ServiceId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + public class QueryGRPCDomainLinksService extends IndexDomainLinksApiGrpc.IndexDomainLinksApiImplBase { private static final Logger logger = LoggerFactory.getLogger(QueryGRPCDomainLinksService.class); - private final NodeConfigurationWatcher nodeConfigurationWatcher; - private final QueryGrpcStubPool stubPool; + private final GrpcStubPool stubPool; @Inject public QueryGRPCDomainLinksService(NodeConfigurationWatcher nodeConfigurationWatcher) { - this.nodeConfigurationWatcher = nodeConfigurationWatcher; - stubPool = new QueryGrpcStubPool<>(nodeConfigurationWatcher) { + stubPool = new GrpcStubPool<>(ServiceId.Index) { @Override public IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub createStub(ManagedChannel channel) { return IndexDomainLinksApiGrpc.newBlockingStub(channel); } + + @Override + public List getEligibleNodes() { + return nodeConfigurationWatcher.getQueryNodes(); + } }; } diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCService.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCService.java index 5f59ee15..ef253253 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCService.java +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCService.java @@ -4,11 +4,12 @@ import com.google.inject.Inject; import io.grpc.ManagedChannel; import io.prometheus.client.Histogram; import lombok.SneakyThrows; +import nu.marginalia.client.grpc.GrpcStubPool; import nu.marginalia.db.DomainBlacklist; import nu.marginalia.index.api.*; import nu.marginalia.model.id.UrlIdCodec; -import nu.marginalia.query.svc.NodeConfigurationWatcher; import nu.marginalia.query.svc.QueryFactory; +import nu.marginalia.service.id.ServiceId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,23 +27,29 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase { .help("QS-side query time (GRPC endpoint)") .register(); - private final QueryGrpcStubPool stubPool; + private final GrpcStubPool stubPool; private final QueryFactory queryFactory; private final DomainBlacklist blacklist; - private final NodeConfigurationWatcher nodeConfigurationWatcher; @Inject - public QueryGRPCService(QueryFactory queryFactory, DomainBlacklist blacklist, NodeConfigurationWatcher nodeConfigurationWatcher) { + public QueryGRPCService(QueryFactory queryFactory, + DomainBlacklist blacklist, + NodeConfigurationWatcher nodeConfigurationWatcher) + { this.queryFactory = queryFactory; this.blacklist = blacklist; - this.nodeConfigurationWatcher = nodeConfigurationWatcher; - stubPool = new QueryGrpcStubPool<>(nodeConfigurationWatcher) { + stubPool = new GrpcStubPool<>(ServiceId.Index) { @Override public IndexApiGrpc.IndexApiBlockingStub createStub(ManagedChannel channel) { return IndexApiGrpc.newBlockingStub(channel); } + + @Override + public List getEligibleNodes() { + return nodeConfigurationWatcher.getQueryNodes(); + } }; } diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java index 7e864635..bd5eaebd 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java @@ -12,7 +12,6 @@ import nu.marginalia.index.client.model.results.DecoratedSearchResultItem; import nu.marginalia.index.client.model.results.SearchResultSet; import nu.marginalia.query.model.QueryParams; import nu.marginalia.query.model.QueryResponse; -import nu.marginalia.query.svc.NodeConfigurationWatcher; import nu.marginalia.query.svc.QueryFactory; import nu.marginalia.service.server.BaseServiceParams; import nu.marginalia.service.server.Service; diff --git a/protobuf.gradle b/protobuf.gradle new file mode 100644 index 00000000..af2f5317 --- /dev/null +++ b/protobuf.gradle @@ -0,0 +1,29 @@ +// Boilerplate configuration that should be included whenever protobufs are used +// see e.g. index-api's build.gradle + +protobuf { + protoc { + if (osdetector.os == "osx") { + artifact = "com.google.protobuf:protoc:3.0.2:osx-x86_64" + } else { + artifact = "com.google.protobuf:protoc:3.0.2" + } + } + plugins { + grpc { + if (osdetector.os == "osx") { + artifact = "io.grpc:protoc-gen-grpc-java:1.1.2:osx-x86_64" + } else { + artifact = "io.grpc:protoc-gen-grpc-java:1.1.2" + } + } + } + + generateProtoTasks { + all().each { task -> + task.plugins { + grpc {} + } + } + } +} \ No newline at end of file