From e8de468b0b7c11faeb0f0ac65540574a9e95f7cf Mon Sep 17 00:00:00 2001 From: Viktor Date: Thu, 8 Feb 2024 13:01:12 +0100 Subject: [PATCH] Make executor API talk GRPC (#75) * (executor-api) Make executor API talk GRPC The executor's REST API was very fragile and annoying to work with, lacking even basic type safety. Migrate to use GRPC instead. GRPC is a bit of a pain with how verbose it is, but that is probably a lesser evil. This is a fairly straightforward change, but it's also large so a solid round of testing is needed... The change set breaks out the GrpcStubPool previously residing in the QueryService, and makes it available to all clients. ServiceId.name was also renamed to avoid the very dangerous clash with Enum.name(). The boilerplate needed for grpc was also extracted into a common gradle file for inclusion into the appropriate build.gradle-files. --- code/api/executor-api/build.gradle | 15 + .../executor/client/ExecutorClient.java | 274 +++++++++++---- .../client/ExecutorRemoteActorFactory.java | 2 +- .../executor/model/load/LoadParameters.java | 10 - .../executor/upload/UploadDirItem.java | 20 -- .../src/main/protobuf/executor-api.proto | 105 ++++++ code/api/index-api/build.gradle | 28 +- .../marginalia/index/client/IndexClient.java | 3 +- .../storage/model/FileStorageId.java | 2 +- code/common/service-client/build.gradle | 1 + .../nu/marginalia/client/ServiceMonitors.java | 2 +- .../marginalia/client/grpc/GrpcStubPool.java} | 39 ++- .../service/descriptor/ServiceDescriptor.java | 2 +- .../nu/marginalia/service/id/ServiceId.java | 11 +- .../java/nu/marginalia/service/MainClass.java | 4 +- .../service/module/ServiceConfiguration.java | 2 +- .../service/server/NodeStatusWatcher.java | 2 +- .../marginalia/control/RedirectControl.java | 2 +- .../node/svc/ControlNodeActionsService.java | 47 +-- .../control/node/svc/ControlNodeService.java | 20 +- .../sys/svc/ControlSysActionsService.java | 5 +- .../executor-service/build.gradle | 1 + .../java/nu/marginalia/actor/ActorApi.java | 74 +++-- .../actor/ExecutorActorStateMachines.java | 2 +- .../executor/ExecutorGrpcService.java | 313 ++++++++++++++++++ .../nu/marginalia/executor/ExecutorSvc.java | 89 +---- .../executor/svc/BackupService.java | 8 +- .../executor/svc/ExportService.java | 50 +-- .../executor/svc/ProcessingService.java | 78 ++--- .../executor/svc/SideloadService.java | 70 ++-- .../executor/svc/TransferService.java | 32 +- .../ExecutorSvcApiIntegrationTest.java | 155 --------- .../{svc => }/NodeConfigurationWatcher.java | 3 +- .../marginalia/query/QueryBasicInterface.java | 2 - .../query/QueryGRPCDomainLinksService.java | 16 +- .../nu/marginalia/query/QueryGRPCService.java | 19 +- .../nu/marginalia/query/QueryService.java | 1 - protobuf.gradle | 29 ++ 38 files changed, 940 insertions(+), 598 deletions(-) delete mode 100644 code/api/executor-api/src/main/java/nu/marginalia/executor/model/load/LoadParameters.java create mode 100644 code/api/executor-api/src/main/protobuf/executor-api.proto rename code/{services-core/query-service/src/main/java/nu/marginalia/query/QueryGrpcStubPool.java => common/service-client/src/main/java/nu/marginalia/client/grpc/GrpcStubPool.java} (60%) create mode 100644 code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java delete mode 100644 code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java rename code/services-core/query-service/src/main/java/nu/marginalia/query/{svc => }/NodeConfigurationWatcher.java (95%) create mode 100644 protobuf.gradle diff --git a/code/api/executor-api/build.gradle b/code/api/executor-api/build.gradle index 28defeb2..257c0285 100644 --- a/code/api/executor-api/build.gradle +++ b/code/api/executor-api/build.gradle @@ -1,6 +1,7 @@ plugins { id 'java' id 'jvm-test-suite' + id "com.google.protobuf" version "0.9.4" } java { @@ -9,6 +10,18 @@ java { } } +apply from: "$rootProject.projectDir/protobuf.gradle" + +sourceSets { + main { + proto { + srcDir 'src/main/protobuf' + } + } +} + + + dependencies { implementation project(':code:common:model') implementation project(':code:api:index-api') @@ -25,6 +38,8 @@ dependencies { implementation libs.guice implementation libs.rxjava implementation libs.protobuf + implementation libs.bundles.grpc + implementation libs.javax.annotation implementation libs.gson testImplementation libs.bundles.slf4j.test diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java index df9c4072..c9b286d5 100644 --- a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java +++ b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java @@ -1,154 +1,282 @@ package nu.marginalia.executor.client; import com.google.inject.Inject; -import io.reactivex.rxjava3.core.Observable; +import com.google.inject.Singleton; import nu.marginalia.client.AbstractDynamicClient; import nu.marginalia.client.Context; -import nu.marginalia.client.exception.RouteNotConfiguredException; +import nu.marginalia.client.grpc.GrpcStubPool; +import nu.marginalia.executor.api.*; +import nu.marginalia.executor.api.ExecutorApiGrpc.ExecutorApiBlockingStub; +import nu.marginalia.executor.model.ActorRunState; import nu.marginalia.executor.model.ActorRunStates; -import nu.marginalia.executor.model.load.LoadParameters; import nu.marginalia.executor.model.transfer.TransferItem; import nu.marginalia.executor.model.transfer.TransferSpec; import nu.marginalia.executor.storage.FileStorageContent; +import nu.marginalia.executor.storage.FileStorageFile; import nu.marginalia.executor.upload.UploadDirContents; +import nu.marginalia.executor.upload.UploadDirItem; import nu.marginalia.model.gson.GsonFactory; +import nu.marginalia.nodecfg.NodeConfigurationService; +import nu.marginalia.nodecfg.model.NodeConfiguration; import nu.marginalia.service.descriptor.ServiceDescriptors; import nu.marginalia.service.id.ServiceId; -import nu.marginalia.storage.model.FileStorage; import nu.marginalia.storage.model.FileStorageId; +import io.grpc.ManagedChannel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.OutputStream; import java.net.URLEncoder; import java.nio.charset.StandardCharsets; import java.nio.file.Path; -import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; +@Singleton public class ExecutorClient extends AbstractDynamicClient { + private final GrpcStubPool stubPool; + private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class); + @Inject - public ExecutorClient(ServiceDescriptors descriptors) { + public ExecutorClient(ServiceDescriptors descriptors, NodeConfigurationService nodeConfigurationService) { super(descriptors.forId(ServiceId.Executor), GsonFactory::get); + + stubPool = new GrpcStubPool<>(ServiceId.Executor) { + @Override + public ExecutorApiBlockingStub createStub(ManagedChannel channel) { + return ExecutorApiGrpc.newBlockingStub(channel); + } + + @Override + public List getEligibleNodes() { + return nodeConfigurationService.getAll() + .stream() + .map(NodeConfiguration::node) + .toList(); + } + }; } - public void startFsm(Context ctx, int node, String actorName) { - post(ctx, node, "/actor/"+actorName+"/start", "").blockingSubscribe(); + public void startFsm(int node, String actorName) { + stubPool.apiForNode(node).startFsm( + RpcFsmName.newBuilder() + .setActorName(actorName) + .build() + ); } - public void stopFsm(Context ctx, int node, String actorName) { - post(ctx, node, "/actor/"+actorName+"/stop", "").blockingSubscribe(); + public void stopFsm(int node, String actorName) { + stubPool.apiForNode(node).stopFsm( + RpcFsmName.newBuilder() + .setActorName(actorName) + .build() + ); } - public void stopProcess(Context ctx, int node, String id) { - post(ctx, node, "/process/" + id + "/stop", "").blockingSubscribe(); + public void stopProcess(int node, String id) { + stubPool.apiForNode(node).stopProcess( + RpcProcessId.newBuilder() + .setProcessId(id) + .build() + ); } - - public void triggerCrawl(Context ctx, int node, FileStorageId fid) { - post(ctx, node, "/process/crawl/" + fid, "").blockingSubscribe(); + public void triggerCrawl(int node, FileStorageId fid) { + stubPool.apiForNode(node).triggerCrawl( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public void triggerRecrawl(Context ctx, int node, FileStorageId fid) { - post(ctx, node, "/process/recrawl", fid).blockingSubscribe(); + public void triggerRecrawl(int node, FileStorageId fid) { + stubPool.apiForNode(node).triggerRecrawl( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public void triggerConvert(Context ctx, int node, FileStorageId fid) { - post(ctx, node, "/process/convert/" + fid.id(), "").blockingSubscribe(); + public void triggerConvert(int node, FileStorageId fid) { + stubPool.apiForNode(node).triggerConvert( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public void triggerConvertAndLoad(Context ctx, int node, FileStorageId fid) { - post(ctx, node, "/process/convert-load/" + fid.id(), "").blockingSubscribe(); + public void triggerConvertAndLoad(int node, FileStorageId fid) { + stubPool.apiForNode(node).triggerConvertAndLoad( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public void loadProcessedData(Context ctx, int node, LoadParameters ids) { - post(ctx, node, "/process/load", ids).blockingSubscribe(); + public void loadProcessedData(int node, List ids) { + stubPool.apiForNode(node).loadProcessedData( + RpcFileStorageIds.newBuilder() + .addAllFileStorageIds(ids.stream().map(FileStorageId::id).toList()) + .build() + ); } - public void calculateAdjacencies(Context ctx, int node) { - post(ctx, node, "/process/adjacency-calculation", "").blockingSubscribe(); + public void calculateAdjacencies(int node) { + stubPool.apiForNode(node).calculateAdjacencies(Empty.getDefaultInstance()); } - public void sideloadEncyclopedia(Context ctx, int node, Path sourcePath, String baseUrl) { - post(ctx, node, - "/sideload/encyclopedia?path="+ URLEncoder.encode(sourcePath.toString(), StandardCharsets.UTF_8) + "&baseUrl=" + URLEncoder.encode(baseUrl, StandardCharsets.UTF_8), - "").blockingSubscribe(); + public void sideloadEncyclopedia(int node, Path sourcePath, String baseUrl) { + stubPool.apiForNode(node).sideloadEncyclopedia( + RpcSideloadEncyclopedia.newBuilder() + .setBaseUrl(baseUrl) + .setSourcePath(sourcePath.toString()) + .build() + ); } - public void sideloadDirtree(Context ctx, int node, Path sourcePath) { - post(ctx, node, - "/sideload/dirtree?path="+ URLEncoder.encode(sourcePath.toString(), StandardCharsets.UTF_8), - "").blockingSubscribe(); + public void sideloadDirtree(int node, Path sourcePath) { + stubPool.apiForNode(node).sideloadDirtree( + RpcSideloadDirtree.newBuilder() + .setSourcePath(sourcePath.toString()) + .build() + ); } - public void sideloadWarc(Context ctx, int node, Path sourcePath) { - post(ctx, node, - "/sideload/warc?path="+ URLEncoder.encode(sourcePath.toString(), StandardCharsets.UTF_8), - "").blockingSubscribe(); + public void sideloadWarc(int node, Path sourcePath) { + stubPool.apiForNode(node).sideloadWarc( + RpcSideloadWarc.newBuilder() + .setSourcePath(sourcePath.toString()) + .build() + ); } - public void sideloadStackexchange(Context ctx, int node, Path sourcePath) { - post(ctx, node, - "/sideload/stackexchange?path="+URLEncoder.encode(sourcePath.toString(), StandardCharsets.UTF_8), - "").blockingSubscribe(); + public void sideloadStackexchange(int node, Path sourcePath) { + stubPool.apiForNode(node).sideloadStackexchange( + RpcSideloadStackexchange.newBuilder() + .setSourcePath(sourcePath.toString()) + .build() + ); } - public void createCrawlSpecFromDownload(Context context, int node, String description, String url) { - post(context, node, "/process/crawl-spec/from-download?description="+URLEncoder.encode(description, StandardCharsets.UTF_8)+"&url="+URLEncoder.encode(url, StandardCharsets.UTF_8), "") - .blockingSubscribe(); + public void createCrawlSpecFromDownload(int node, String description, String url) { + stubPool.apiForNode(node).createCrawlSpecFromDownload( + RpcCrawlSpecFromDownload.newBuilder() + .setDescription(description) + .setUrl(url) + .build() + ); } - public void exportAtags(Context ctx, int node, FileStorageId fid) { - post(ctx, node, "/export/atags?fid="+fid, "").blockingSubscribe(); + public void exportAtags(int node, FileStorageId fid) { + stubPool.apiForNode(node).exportAtags( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public void exportSampleData(Context ctx, int node, FileStorageId fid, int size, String name) { - post(ctx, node, "/export/sample-data?fid="+fid+"&size="+size+"&name="+URLEncoder.encode(name, StandardCharsets.UTF_8), "").blockingSubscribe(); + public void exportSampleData(int node, FileStorageId fid, int size, String name) { + stubPool.apiForNode(node).exportSampleData( + RpcExportSampleData.newBuilder() + .setFileStorageId(fid.id()) + .setSize(size) + .setName(name) + .build() + ); } - public void exportRssFeeds(Context ctx, int node, FileStorageId fid) { - post(ctx, node, "/export/feeds?fid="+fid, "").blockingSubscribe(); + public void exportRssFeeds(int node, FileStorageId fid) { + stubPool.apiForNode(node).exportRssFeeds( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public void exportTermFrequencies(Context ctx, int node, FileStorageId fid) { - post(ctx, node, "/export/termfreq?fid="+fid, "").blockingSubscribe(); + public void exportTermFrequencies(int node, FileStorageId fid) { + stubPool.apiForNode(node).exportTermFrequencies( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public void downloadSampleData(Context ctx, int node, String sampleSet) { - post(ctx, node, "/action/download-sample-data?set="+URLEncoder.encode(sampleSet, StandardCharsets.UTF_8), "").blockingSubscribe(); + public void downloadSampleData(int node, String sampleSet) { + stubPool.apiForNode(node).downloadSampleData( + RpcDownloadSampleData.newBuilder() + .setSampleSet(sampleSet) + .build() + ); } - public void exportData(Context ctx, int node) { - post(ctx, node, "/export/data", "").blockingSubscribe(); + public void exportData(int node) { + stubPool.apiForNode(node).exportData(Empty.getDefaultInstance()); } - public void restoreBackup(Context context, int node, FileStorageId fid) { - post(context, node, "/backup/" + fid + "/restore", "").blockingSubscribe(); + public void restoreBackup(int node, FileStorageId fid) { + stubPool.apiForNode(node).restoreBackup( + RpcFileStorageId.newBuilder() + .setFileStorageId(fid.id()) + .build() + ); } - public ActorRunStates getActorStates(Context context, int node) { + public ActorRunStates getActorStates(int node) { try { - return get(context, node, "/actor", ActorRunStates.class).blockingFirst(); + var rs = stubPool.apiForNode(node).getActorStates(Empty.getDefaultInstance()); + var states = rs.getActorRunStatesList().stream() + .map(r -> new ActorRunState( + r.getActorName(), + r.getState(), + r.getActorDescription(), + r.getStateDescription(), + r.getTerminal(), + r.getCanStart()) + ) + .toList(); + + return new ActorRunStates(node, states); } - catch (RouteNotConfiguredException ex) { - // node is down, return dummy data - return new ActorRunStates(node, new ArrayList<>()); + catch (Exception ex) { + logger.warn("Failed to get actor states", ex); + + // Return an empty list of states to avoid breaking the UI when a node is down + return new ActorRunStates(node, List.of()); } } - public UploadDirContents listSideloadDir(Context context, int node) { + public UploadDirContents listSideloadDir(int node) { try { - return get(context, node, "/sideload/", UploadDirContents.class).blockingFirst(); + var rs = stubPool.apiForNode(node).listSideloadDir(Empty.getDefaultInstance()); + var items = rs.getEntriesList().stream() + .map(i -> new UploadDirItem(i.getName(), i.getLastModifiedTime(), i.getIsDirectory(), i.getSize())) + .toList(); + return new UploadDirContents(rs.getPath(), items); } - catch (RouteNotConfiguredException ex) { - // node is down, return dummy data - return new UploadDirContents("/", new ArrayList<>()); + catch (Exception ex) { + logger.warn("Failed to list sideload dir", ex); + + // Return an empty list of items to avoid breaking the UI when a node is down + return new UploadDirContents("", List.of()); } } - public FileStorageContent listFileStorage(Context context, int node, FileStorageId fileId) { + public FileStorageContent listFileStorage(int node, FileStorageId fileId) { try { - return get(context, node, "/storage/"+fileId.id(), FileStorageContent.class).blockingFirst(); + var rs = stubPool.apiForNode(node).listFileStorage( + RpcFileStorageId.newBuilder() + .setFileStorageId(fileId.id()) + .build() + ); + + return new FileStorageContent(rs.getEntriesList().stream() + .map(e -> new FileStorageFile(e.getName(), e.getSize(), e.getLastModifiedTime())) + .toList()); } - catch (RouteNotConfiguredException ex) { - // node is down, return dummy data - return new FileStorageContent(new ArrayList<>()); + catch (Exception ex) { + logger.warn("Failed to list file storage", ex); + + // Return an empty list of items to avoid breaking the UI when a node is down + return new FileStorageContent(List.of()); } } diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java index ffbe168c..3bdbe95c 100644 --- a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java +++ b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorRemoteActorFactory.java @@ -1,7 +1,7 @@ package nu.marginalia.executor.client; import com.google.inject.Inject; -import jakarta.inject.Singleton; +import com.google.inject.Singleton; import nu.marginalia.model.gson.GsonFactory; import com.google.gson.Gson; import nu.marginalia.mq.MqMessage; diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/model/load/LoadParameters.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/model/load/LoadParameters.java deleted file mode 100644 index 1874406b..00000000 --- a/code/api/executor-api/src/main/java/nu/marginalia/executor/model/load/LoadParameters.java +++ /dev/null @@ -1,10 +0,0 @@ -package nu.marginalia.executor.model.load; - -import nu.marginalia.storage.model.FileStorageId; - -import java.util.List; - -public record LoadParameters( - List ids -) { -} diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/upload/UploadDirItem.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/upload/UploadDirItem.java index bb6a6f24..c1953f8f 100644 --- a/code/api/executor-api/src/main/java/nu/marginalia/executor/upload/UploadDirItem.java +++ b/code/api/executor-api/src/main/java/nu/marginalia/executor/upload/UploadDirItem.java @@ -1,29 +1,9 @@ package nu.marginalia.executor.upload; -import lombok.SneakyThrows; - -import java.nio.file.Files; -import java.nio.file.Path; -import java.time.LocalDateTime; -import java.time.ZoneId; -import java.time.format.DateTimeFormatter; - public record UploadDirItem ( String name, String lastModifiedTime, boolean isDirectory, long size ) { - - @SneakyThrows - public static UploadDirItem fromPath(Path path) { - boolean isDir = Files.isDirectory(path); - long size = isDir ? 0 : Files.size(path); - var mtime = Files.getLastModifiedTime(path); - - - return new UploadDirItem(path.toString(), - LocalDateTime.ofInstant(mtime.toInstant(), ZoneId.systemDefault()).format(DateTimeFormatter.ISO_DATE_TIME), isDir, size); - } - } diff --git a/code/api/executor-api/src/main/protobuf/executor-api.proto b/code/api/executor-api/src/main/protobuf/executor-api.proto new file mode 100644 index 00000000..bc05844e --- /dev/null +++ b/code/api/executor-api/src/main/protobuf/executor-api.proto @@ -0,0 +1,105 @@ +syntax="proto3"; +package actorapi; + +option java_package="nu.marginalia.executor.api"; +option java_multiple_files=true; + +service ExecutorApi { + rpc startFsm(RpcFsmName) returns (Empty) {} + rpc stopFsm(RpcFsmName) returns (Empty) {} + + rpc stopProcess(RpcProcessId) returns (Empty) {} + + rpc triggerCrawl(RpcFileStorageId) returns (Empty) {} + rpc triggerRecrawl(RpcFileStorageId) returns (Empty) {} + rpc triggerConvert(RpcFileStorageId) returns (Empty) {} + rpc triggerConvertAndLoad(RpcFileStorageId) returns (Empty) {} + rpc loadProcessedData(RpcFileStorageIds) returns (Empty) {} + rpc calculateAdjacencies(Empty) returns (Empty) {} + + rpc sideloadEncyclopedia(RpcSideloadEncyclopedia) returns (Empty) {} + rpc sideloadDirtree(RpcSideloadDirtree) returns (Empty) {} + rpc sideloadWarc(RpcSideloadWarc) returns (Empty) {} + rpc sideloadStackexchange(RpcSideloadStackexchange) returns (Empty) {} + + rpc createCrawlSpecFromDownload(RpcCrawlSpecFromDownload) returns (Empty) {} + rpc exportAtags(RpcFileStorageId) returns (Empty) {} + rpc exportSampleData(RpcExportSampleData) returns (Empty) {} + rpc exportRssFeeds(RpcFileStorageId) returns (Empty) {} + rpc exportTermFrequencies(RpcFileStorageId) returns (Empty) {} + rpc downloadSampleData(RpcDownloadSampleData) returns (Empty) {} + rpc exportData(Empty) returns (Empty) {} + rpc restoreBackup(RpcFileStorageId) returns (Empty) {} + rpc getActorStates(Empty) returns (RpcActorRunStates) {} + rpc listSideloadDir(Empty) returns (RpcUploadDirContents) {} + rpc listFileStorage(RpcFileStorageId) returns (RpcFileStorageContent) {} +} + +message Empty {} +message RpcFsmName { + string actorName = 1; +} +message RpcProcessId { + string processId = 1; +} +message RpcFileStorageId { + int64 fileStorageId = 1; +} +message RpcFileStorageIds { + repeated int64 fileStorageIds = 1; +} +message RpcSideloadEncyclopedia { + string sourcePath = 1; + string baseUrl = 2; +} +message RpcSideloadDirtree { + string sourcePath = 1; +} +message RpcSideloadWarc { + string sourcePath = 1; +} +message RpcSideloadStackexchange { + string sourcePath = 1; +} +message RpcCrawlSpecFromDownload { + string description = 1; + string url = 2; +} +message RpcExportSampleData { + int64 fileStorageId = 1; + int32 size = 2; + string name = 3; +} +message RpcDownloadSampleData { + string sampleSet = 1; +} +message RpcActorRunStates { + int32 node = 1; + repeated RpcActorRunState actorRunStates = 2; +} +message RpcActorRunState { + string actorName = 1; + string state = 2; + string actorDescription = 3; + string stateDescription = 4; + bool terminal = 5; + bool canStart = 6; +} +message RpcUploadDirContents { + string path = 1; + repeated RpcUploadDirEntry entries = 2; +} +message RpcUploadDirEntry { + string name = 1; + string lastModifiedTime = 2; + bool isDirectory = 3; + int64 size = 4; +} +message RpcFileStorageContent { + repeated RpcFileStorageEntry entries = 1; +} +message RpcFileStorageEntry { + string name = 1; + int64 size = 2; + string lastModifiedTime = 3; +} diff --git a/code/api/index-api/build.gradle b/code/api/index-api/build.gradle index 51205960..08e629a4 100644 --- a/code/api/index-api/build.gradle +++ b/code/api/index-api/build.gradle @@ -17,6 +17,9 @@ sourceSets { } } } + +apply from: "$rootProject.projectDir/protobuf.gradle" + dependencies { implementation project(':code:common:model') implementation project(':code:common:config') @@ -42,30 +45,5 @@ dependencies { testImplementation libs.mockito } -protobuf { - protoc { - if (osdetector.os == "osx") { - artifact = "com.google.protobuf:protoc:3.0.2:osx-x86_64" - } else { - artifact = "com.google.protobuf:protoc:3.0.2" - } - } - plugins { - grpc { - if (osdetector.os == "osx") { - artifact = "io.grpc:protoc-gen-grpc-java:1.1.2:osx-x86_64" - } else { - artifact = "io.grpc:protoc-gen-grpc-java:1.1.2" - } - } - } - generateProtoTasks { - all().each { task -> - task.plugins { - grpc {} - } - } - } -} diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java index 808e2a1f..7c334b8e 100644 --- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java +++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java @@ -20,7 +20,6 @@ import nu.marginalia.service.id.ServiceId; import java.util.List; import javax.annotation.CheckReturnValue; -import java.util.ServiceConfigurationError; import java.util.UUID; @Singleton @@ -39,7 +38,7 @@ public class IndexClient extends AbstractDynamicClient { super(descriptors.forId(ServiceId.Index), GsonFactory::get); this.messageQueueFactory = messageQueueFactory; - String inboxName = ServiceId.Index.name; + String inboxName = ServiceId.Index.serviceName; String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString()); outbox = messageQueueFactory.createOutbox(inboxName, nodeId, outboxName, nodeId, UUID.randomUUID()); setTimeout(30); diff --git a/code/common/config/src/main/java/nu/marginalia/storage/model/FileStorageId.java b/code/common/config/src/main/java/nu/marginalia/storage/model/FileStorageId.java index f31c7ab8..eb22f6f0 100644 --- a/code/common/config/src/main/java/nu/marginalia/storage/model/FileStorageId.java +++ b/code/common/config/src/main/java/nu/marginalia/storage/model/FileStorageId.java @@ -4,7 +4,7 @@ public record FileStorageId(long id) { public static FileStorageId parse(String str) { return new FileStorageId(Long.parseLong(str)); } - public static FileStorageId of(int storageId) { + public static FileStorageId of(long storageId) { return new FileStorageId(storageId); } diff --git a/code/common/service-client/build.gradle b/code/common/service-client/build.gradle index d34f362f..da1b1a71 100644 --- a/code/common/service-client/build.gradle +++ b/code/common/service-client/build.gradle @@ -24,6 +24,7 @@ dependencies { implementation libs.bundles.httpcomponents implementation libs.bundles.gson + implementation libs.bundles.grpc implementation libs.protobuf implementation libs.bundles.prometheus diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/ServiceMonitors.java b/code/common/service-client/src/main/java/nu/marginalia/client/ServiceMonitors.java index f4244112..b09ffa0c 100644 --- a/code/common/service-client/src/main/java/nu/marginalia/client/ServiceMonitors.java +++ b/code/common/service-client/src/main/java/nu/marginalia/client/ServiceMonitors.java @@ -114,7 +114,7 @@ public class ServiceMonitors { public boolean isServiceUp(ServiceId serviceId, int node) { synchronized (runningServices) { - return runningServices.contains(new ServiceNode(serviceId.name, node)); + return runningServices.contains(new ServiceNode(serviceId.serviceName, node)); } } diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGrpcStubPool.java b/code/common/service-client/src/main/java/nu/marginalia/client/grpc/GrpcStubPool.java similarity index 60% rename from code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGrpcStubPool.java rename to code/common/service-client/src/main/java/nu/marginalia/client/grpc/GrpcStubPool.java index ed95b18c..0692c002 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGrpcStubPool.java +++ b/code/common/service-client/src/main/java/nu/marginalia/client/grpc/GrpcStubPool.java @@ -1,8 +1,8 @@ -package nu.marginalia.query; +package nu.marginalia.client.grpc; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import nu.marginalia.query.svc.NodeConfigurationWatcher; +import nu.marginalia.service.id.ServiceId; import java.util.List; import java.util.Map; @@ -10,26 +10,34 @@ import java.util.concurrent.*; import java.util.function.Function; import java.util.stream.Stream; -public abstract class QueryGrpcStubPool { +/** A pool of gRPC stubs for a service, with a separate stub for each node. + * Manages broadcast-style request. */ +public abstract class GrpcStubPool { + public GrpcStubPool(String serviceName) { + + this.serviceName = serviceName; + } + protected record ServiceAndNode(String service, int node) { public String getHostName() { return service+"-"+node; } } - private final NodeConfigurationWatcher nodeConfigurationWatcher; private final Map channels = new ConcurrentHashMap<>(); - private final Map actorRpcApis = new ConcurrentHashMap<>(); + private final Map apis = new ConcurrentHashMap<>(); private final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor(); - QueryGrpcStubPool(NodeConfigurationWatcher nodeConfigurationWatcher) { - this.nodeConfigurationWatcher = nodeConfigurationWatcher; + private final String serviceName; + + public GrpcStubPool(ServiceId serviceId) { + this.serviceName = serviceId.serviceName; } /** Get an API stub for the given node */ - public STUB indexApi(int node) { - var san = new ServiceAndNode("index-service", node); - return actorRpcApis.computeIfAbsent(san, n -> + public STUB apiForNode(int node) { + var san = new ServiceAndNode(serviceName, node); + return apis.computeIfAbsent(san, n -> createStub(channels.computeIfAbsent(san, this::createChannel)) ); } @@ -41,8 +49,8 @@ public abstract class QueryGrpcStubPool { /** Invoke a function on each node, returning a list of futures in a terminal state, as per * ExecutorService$invokeAll */ public List> invokeAll(Function> callF) throws InterruptedException { - List> calls = nodeConfigurationWatcher.getQueryNodes().stream() - .map(id -> callF.apply(indexApi(id))) + List> calls = getEligibleNodes().stream() + .map(id -> callF.apply(apiForNode(id))) .toList(); return virtualExecutorService.invokeAll(calls); @@ -50,8 +58,8 @@ public abstract class QueryGrpcStubPool { /** Invoke a function on each node, returning a stream of results */ public Stream callEachSequential(Function call) { - return nodeConfigurationWatcher.getQueryNodes().stream() - .map(id -> call.apply(indexApi(id))); + return getEligibleNodes().stream() + .map(id -> call.apply(apiForNode(id))); } @@ -61,4 +69,7 @@ public abstract class QueryGrpcStubPool { */ public abstract STUB createStub(ManagedChannel channel); + /** Get the list of nodes that are eligible for broadcast-style requests */ + public abstract List getEligibleNodes(); + } diff --git a/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptor.java b/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptor.java index 29eac14d..68008bb3 100644 --- a/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptor.java +++ b/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptor.java @@ -8,7 +8,7 @@ public class ServiceDescriptor { public ServiceDescriptor(ServiceId id) { this.id = id; - this.name = id.name; + this.name = id.serviceName; } public ServiceDescriptor(ServiceId id, String host) { diff --git a/code/common/service-discovery/src/main/java/nu/marginalia/service/id/ServiceId.java b/code/common/service-discovery/src/main/java/nu/marginalia/service/id/ServiceId.java index 94aeb10b..26f52b6b 100644 --- a/code/common/service-discovery/src/main/java/nu/marginalia/service/id/ServiceId.java +++ b/code/common/service-discovery/src/main/java/nu/marginalia/service/id/ServiceId.java @@ -14,18 +14,19 @@ public enum ServiceId { Dating("dating-service"), Explorer("explorer-service"); - public final String name; - ServiceId(String name) { - this.name = name; + public final String serviceName; + + ServiceId(String serviceName) { + this.serviceName = serviceName; } public String withNode(int node) { - return name + ":" + node; + return serviceName + ":" + node; } public static ServiceId byName(String name) { for (ServiceId id : values()) { - if (id.name.equals(name)) { + if (id.serviceName.equals(name)) { return id; } } diff --git a/code/common/service/src/main/java/nu/marginalia/service/MainClass.java b/code/common/service/src/main/java/nu/marginalia/service/MainClass.java index 40303e48..be3080fb 100644 --- a/code/common/service/src/main/java/nu/marginalia/service/MainClass.java +++ b/code/common/service/src/main/java/nu/marginalia/service/MainClass.java @@ -51,10 +51,10 @@ public abstract class MainClass { protected static void init(ServiceId id, String... args) { System.setProperty("log4j2.isThreadContextMapInheritable", "true"); System.setProperty("isThreadContextMapInheritable", "true"); - System.setProperty("service-name", id.name); + System.setProperty("service-name", id.serviceName); ConfigLoader.loadConfig( - ConfigLoader.getConfigPath(id.name) + ConfigLoader.getConfigPath(id.serviceName) ); initJdbc(); diff --git a/code/common/service/src/main/java/nu/marginalia/service/module/ServiceConfiguration.java b/code/common/service/src/main/java/nu/marginalia/service/module/ServiceConfiguration.java index df97b7b0..6e2b3399 100644 --- a/code/common/service/src/main/java/nu/marginalia/service/module/ServiceConfiguration.java +++ b/code/common/service/src/main/java/nu/marginalia/service/module/ServiceConfiguration.java @@ -22,6 +22,6 @@ public record ServiceConfiguration(ServiceId serviceId, int metricsPort, UUID instanceUuid) { public String serviceName() { - return serviceId.name; + return serviceId.serviceName; } } diff --git a/code/common/service/src/main/java/nu/marginalia/service/server/NodeStatusWatcher.java b/code/common/service/src/main/java/nu/marginalia/service/server/NodeStatusWatcher.java index b193b240..4de16cf0 100644 --- a/code/common/service/src/main/java/nu/marginalia/service/server/NodeStatusWatcher.java +++ b/code/common/service/src/main/java/nu/marginalia/service/server/NodeStatusWatcher.java @@ -1,7 +1,7 @@ package nu.marginalia.service.server; import com.google.inject.name.Named; -import jakarta.inject.Inject; +import com.google.inject.Inject; import lombok.SneakyThrows; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.nodecfg.NodeConfigurationService; diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/RedirectControl.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/RedirectControl.java index 10f33808..b23be402 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/RedirectControl.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/RedirectControl.java @@ -1,6 +1,6 @@ package nu.marginalia.control; -import jakarta.inject.Inject; +import com.google.inject.Inject; import spark.ResponseTransformer; import java.io.IOException; diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java index a707aefe..cf8acaf4 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java @@ -2,11 +2,9 @@ package nu.marginalia.control.node.svc; import com.google.inject.Inject; import com.google.inject.Singleton; -import nu.marginalia.client.Context; import nu.marginalia.control.ControlValidationError; import nu.marginalia.control.RedirectControl; import nu.marginalia.executor.client.ExecutorClient; -import nu.marginalia.executor.model.load.LoadParameters; import nu.marginalia.index.client.IndexClient; import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.storage.FileStorageService; @@ -106,7 +104,7 @@ public class ControlNodeActionsService { if (!Set.of("sample-s", "sample-m", "sample-l", "sample-xl").contains(set)) throw new ControlValidationError("Invalid sample specified", "A valid sample data set must be specified", ".."); - executorClient.downloadSampleData(Context.fromRequest(request), Integer.parseInt(request.params("node")), set); + executorClient.downloadSampleData(Integer.parseInt(request.params("node")), set); logger.info("Downloading sample data set {}", set); @@ -126,7 +124,7 @@ public class ControlNodeActionsService { eventLog.logEvent("USER-ACTION", "SIDELOAD ENCYCLOPEDIA " + nodeId); - executorClient.sideloadEncyclopedia(Context.fromRequest(request), nodeId, sourcePath, baseUrl); + executorClient.sideloadEncyclopedia(nodeId, sourcePath, baseUrl); return ""; } @@ -139,7 +137,7 @@ public class ControlNodeActionsService { eventLog.logEvent("USER-ACTION", "SIDELOAD DIRTREE " + nodeId); - executorClient.sideloadDirtree(Context.fromRequest(request), nodeId, sourcePath); + executorClient.sideloadDirtree(nodeId, sourcePath); return ""; } @@ -151,7 +149,7 @@ public class ControlNodeActionsService { eventLog.logEvent("USER-ACTION", "SIDELOAD WARC " + nodeId); - executorClient.sideloadWarc(Context.fromRequest(request), nodeId, sourcePath); + executorClient.sideloadWarc(nodeId, sourcePath); return ""; } @@ -166,7 +164,8 @@ public class ControlNodeActionsService { eventLog.logEvent("USER-ACTION", "SIDELOAD STACKEXCHANGE " + nodeId); - executorClient.sideloadStackexchange(Context.fromRequest(request), nodeId, sourcePath); + executorClient.sideloadStackexchange(nodeId, sourcePath); + return ""; } @@ -184,7 +183,6 @@ public class ControlNodeActionsService { changeActiveStorage(nodeId, FileStorageType.CRAWL_DATA, toCrawl); executorClient.triggerRecrawl( - Context.fromRequest(request), nodeId, toCrawl ); @@ -199,11 +197,7 @@ public class ControlNodeActionsService { changeActiveStorage(nodeId, FileStorageType.CRAWL_SPEC, toCrawl); - executorClient.triggerCrawl( - Context.fromRequest(request), - nodeId, - toCrawl - ); + executorClient.triggerCrawl(nodeId, toCrawl); return ""; } @@ -216,14 +210,10 @@ public class ControlNodeActionsService { changeActiveStorage(nodeId, FileStorageType.PROCESSED_DATA, toProcess); if (isAutoload) { - executorClient.triggerConvertAndLoad(Context.fromRequest(request), - nodeId, - toProcess); + executorClient.triggerConvertAndLoad(nodeId, toProcess); } else { - executorClient.triggerConvert(Context.fromRequest(request), - nodeId, - toProcess); + executorClient.triggerConvert(nodeId, toProcess); } return ""; @@ -241,10 +231,7 @@ public class ControlNodeActionsService { changeActiveStorage(nodeId, FileStorageType.PROCESSED_DATA, ids.toArray(new FileStorageId[0])); - executorClient.loadProcessedData(Context.fromRequest(request), - nodeId, - new LoadParameters(ids) - ); + executorClient.loadProcessedData(nodeId, ids); return ""; } @@ -254,7 +241,7 @@ public class ControlNodeActionsService { var toLoad = parseSourceFileStorageId(request.queryParams("source")); - executorClient.restoreBackup(Context.fromRequest(request), nodeId, toLoad); + executorClient.restoreBackup(nodeId, toLoad); return ""; } @@ -286,13 +273,13 @@ public class ControlNodeActionsService { throw new ControlValidationError("No url specified", "A url must be specified", ".."); } - executorClient.createCrawlSpecFromDownload(Context.fromRequest(request), nodeId, description, url); + executorClient.createCrawlSpecFromDownload(nodeId, description, url); return ""; } private Object exportDbData(Request req, Response rsp) { - executorClient.exportData(Context.fromRequest(req), Integer.parseInt(req.params("id"))); + executorClient.exportData(Integer.parseInt(req.params("id"))); return ""; } @@ -302,9 +289,9 @@ public class ControlNodeActionsService { FileStorageId source = parseSourceFileStorageId(req.queryParams("source")); switch (exportType) { - case "atags" -> executorClient.exportAtags(Context.fromRequest(req), Integer.parseInt(req.params("id")), source); - case "rss" -> executorClient.exportRssFeeds(Context.fromRequest(req), Integer.parseInt(req.params("id")), source); - case "termFreq" -> executorClient.exportTermFrequencies(Context.fromRequest(req), Integer.parseInt(req.params("id")), source); + case "atags" -> executorClient.exportAtags(Integer.parseInt(req.params("id")), source); + case "rss" -> executorClient.exportRssFeeds(Integer.parseInt(req.params("id")), source); + case "termFreq" -> executorClient.exportTermFrequencies(Integer.parseInt(req.params("id")), source); default -> throw new ControlValidationError("No export type specified", "An export type must be specified", ".."); } @@ -316,7 +303,7 @@ public class ControlNodeActionsService { int size = Integer.parseInt(req.queryParams("size")); String name = req.queryParams("name"); - executorClient.exportSampleData(Context.fromRequest(req), Integer.parseInt(req.params("id")), source, size, name); + executorClient.exportSampleData(Integer.parseInt(req.params("id")), source, size, name); return ""; } diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java index 66a2b485..e4859eb3 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeService.java @@ -107,13 +107,13 @@ public class ControlNodeService { } public Object startFsm(Request req, Response rsp) throws Exception { - executorClient.startFsm(Context.fromRequest(req), Integer.parseInt(req.params("id")), req.params("fsm").toUpperCase()); + executorClient.startFsm(Integer.parseInt(req.params("id")), req.params("fsm").toUpperCase()); return redirectToOverview(req); } public Object stopFsm(Request req, Response rsp) throws Exception { - executorClient.stopFsm(Context.fromRequest(req), Integer.parseInt(req.params("id")), req.params("fsm").toUpperCase()); + executorClient.stopFsm(Integer.parseInt(req.params("id")), req.params("fsm").toUpperCase()); return redirectToOverview(req); } @@ -132,7 +132,7 @@ public class ControlNodeService { int nodeId = Integer.parseInt(request.params("id")); String processBase = request.params("processBase"); - executorClient.stopProcess(Context.fromRequest(request), nodeId, processBase); + executorClient.stopProcess(nodeId, processBase); return ""; } @@ -153,7 +153,7 @@ public class ControlNodeService { return Map.of( "tab", Map.of("actors", true), "node", nodeConfigurationService.get(nodeId), - "actors", executorClient.getActorStates(Context.fromRequest(request), nodeId).states() + "actors", executorClient.getActorStates(nodeId).states() ); } @@ -164,7 +164,7 @@ public class ControlNodeService { "tab", Map.of("actions", true), "node", nodeConfigurationService.get(nodeId), "view", Map.of(request.queryParams("view"), true), - "uploadDirContents", executorClient.listSideloadDir(Context.fromRequest(request), nodeId), + "uploadDirContents", executorClient.listSideloadDir(nodeId), "allBackups", fileStorageService.getEachFileStorage(nodeId, FileStorageType.BACKUP), "allCrawlData", @@ -279,9 +279,9 @@ public class ControlNodeService { int nodeId = Integer.parseInt(request.params("id")); var config = nodeConfigurationService.get(nodeId); - var actors = executorClient.getActorStates(Context.fromRequest(request), nodeId).states(); - - actors.removeIf(actor -> actor.state().equals("MONITOR")); + var actors = executorClient.getActorStates(nodeId).states() + .stream().filter(actor -> !actor.state().equals("MONITOR")) + .toList(); return Map.of( "node", nodeConfigurationService.get(nodeId), @@ -308,7 +308,7 @@ public class ControlNodeService { } private List getEvents(int nodeId) { - List services = List.of(ServiceId.Index.name+":"+nodeId, ServiceId.Executor.name+":"+nodeId); + List services = List.of(ServiceId.Index.serviceName +":"+nodeId, ServiceId.Executor.serviceName +":"+nodeId); List events = new ArrayList<>(20); for (var service :services) { events.addAll(eventLogService.getLastEntriesForService(service, Long.MAX_VALUE, 10)); @@ -399,7 +399,7 @@ public class ControlNodeService { List files = new ArrayList<>(); - for (var execFile : executorClient.listFileStorage(context, node, fileId).files()) { + for (var execFile : executorClient.listFileStorage(node, fileId).files()) { files.add(new FileStorageFileModel( execFile.name(), execFile.modTime(), diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java index cfef6577..726491ec 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java @@ -2,7 +2,6 @@ package nu.marginalia.control.sys.svc; import com.google.inject.Inject; import lombok.SneakyThrows; -import nu.marginalia.client.Context; import nu.marginalia.control.ControlRendererFactory; import nu.marginalia.control.Redirects; import nu.marginalia.control.actor.ControlActor; @@ -56,7 +55,7 @@ public class ControlSysActionsService { * and lacks a proper internal API */ private MqOutbox createApiOutbox(MessageQueueFactory mqFactory) { - String inboxName = ServiceId.Api.name + ":" + "0"; + String inboxName = ServiceId.Api.serviceName + ":" + "0"; String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString()); return mqFactory.createOutbox(inboxName, 0, outboxName, 0, UUID.randomUUID()); } @@ -119,7 +118,7 @@ public class ControlSysActionsService { // This is technically not a partitioned operation, but we execute it at node 1 // and let the effects be global :-) - executorClient.calculateAdjacencies(Context.fromRequest(request), 1); + executorClient.calculateAdjacencies(1); return ""; } diff --git a/code/services-core/executor-service/build.gradle b/code/services-core/executor-service/build.gradle index 26f97808..e903d048 100644 --- a/code/services-core/executor-service/build.gradle +++ b/code/services-core/executor-service/build.gradle @@ -51,6 +51,7 @@ dependencies { implementation libs.bundles.slf4j implementation libs.spark + implementation libs.bundles.grpc implementation libs.gson implementation libs.prometheus implementation libs.notnull diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorApi.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorApi.java index 2e570f05..13923302 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorApi.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ActorApi.java @@ -2,17 +2,21 @@ package nu.marginalia.actor; import com.google.inject.Inject; import com.google.inject.Singleton; -import nu.marginalia.actor.proc.ProcessLivenessMonitorActor; +import nu.marginalia.actor.state.ActorStateInstance; +import nu.marginalia.executor.api.RpcActorRunState; +import nu.marginalia.executor.api.RpcActorRunStates; +import nu.marginalia.executor.api.RpcFsmName; +import nu.marginalia.executor.api.RpcProcessId; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.process.ProcessService; import nu.marginalia.service.module.ServiceConfiguration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import spark.Request; -import spark.Response; import spark.Spark; +import java.util.Comparator; + @Singleton public class ActorApi { private final ExecutorActorControlService actors; @@ -32,33 +36,20 @@ public class ActorApi { this.serviceConfiguration = serviceConfiguration; } - public Object startActorFromState(Request request, Response response) throws Exception { - ExecutorActor actor = translateActor(request.params("id")); - String state = request.params("state"); - actors.startFromJSON(actor, state, request.body()); - - return ""; - } - - public Object startActor(Request request, Response response) throws Exception { - ExecutorActor actor = translateActor(request.params("id")); + public void startActor(RpcFsmName actorName) throws Exception { + ExecutorActor actor = translateActor(actorName.getActorName()); actors.start(actor); - - return ""; } - public Object stopActor(Request request, Response response) { - ExecutorActor actor = translateActor(request.params("id")); - + public void stopActor(RpcFsmName actorName) { + ExecutorActor actor = translateActor(actorName.getActorName()); actors.stop(actor); - - return "OK"; } - public Object stopProcess(Request request, Response response) { - ProcessService.ProcessId id = ProcessService.translateExternalIdBase(request.params("id")); + public Object stopProcess(RpcProcessId processId) { + ProcessService.ProcessId id = ProcessService.translateExternalIdBase(processId.getProcessId()); try { String inbox = id.name().toLowerCase() + ":" + serviceConfiguration.node(); @@ -84,6 +75,45 @@ public class ActorApi { return "OK"; } + + public RpcActorRunStates getActorStates() { + var items = actors.getActorStates().entrySet().stream().map(e -> { + final var stateGraph = actors.getActorDefinition(e.getKey()); + + final ActorStateInstance state = e.getValue(); + final String actorDescription = stateGraph.describe(); + + final String machineName = e.getKey().name(); + final String stateName = state.name(); + + final String stateDescription = ""; + + final boolean terminal = state.isFinal(); + final boolean canStart = actors.isDirectlyInitializable(e.getKey()) && terminal; + + return RpcActorRunState + .newBuilder() + .setActorName(machineName) + .setState(stateName) + .setActorDescription(actorDescription) + .setStateDescription(stateDescription) + .setTerminal(terminal) + .setCanStart(canStart) + .build(); + + }) + .filter(s -> !s.getTerminal() || s.getCanStart()) + .sorted(Comparator.comparing(RpcActorRunState::getActorName)) + .toList(); + + return RpcActorRunStates.newBuilder() + .setNode(serviceConfiguration.node()) + .addAllActorRunStates(items) + .build(); + + } + + public ExecutorActor translateActor(String name) { try { return ExecutorActor.valueOf(name.toUpperCase()); diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ExecutorActorStateMachines.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ExecutorActorStateMachines.java index 8d456822..4e1804df 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ExecutorActorStateMachines.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/ExecutorActorStateMachines.java @@ -1,7 +1,7 @@ package nu.marginalia.actor; import com.google.gson.Gson; -import jakarta.inject.Singleton; +import com.google.inject.Singleton; import nu.marginalia.actor.state.ActorStateInstance; import nu.marginalia.actor.state.ActorStep; import nu.marginalia.model.gson.GsonFactory; diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java new file mode 100644 index 00000000..0816c51b --- /dev/null +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java @@ -0,0 +1,313 @@ +package nu.marginalia.executor; + +import com.google.inject.Inject; +import io.grpc.stub.StreamObserver; +import nu.marginalia.actor.ActorApi; +import nu.marginalia.executor.api.*; +import nu.marginalia.executor.svc.*; + +public class ExecutorGrpcService extends ExecutorApiGrpc.ExecutorApiImplBase { + private final ActorApi actorApi; + private final ExportService exportService; + private final SideloadService sideloadService; + private final BackupService backupService; + private final TransferService transferService; + private final ProcessingService processingService; + + @Inject + public ExecutorGrpcService(ActorApi actorApi, + ExportService exportService, + SideloadService sideloadService, + BackupService backupService, + TransferService transferService, + ProcessingService processingService) + { + this.actorApi = actorApi; + this.exportService = exportService; + this.sideloadService = sideloadService; + this.backupService = backupService; + this.transferService = transferService; + this.processingService = processingService; + } + + @Override + public void startFsm(RpcFsmName request, StreamObserver responseObserver) { + try { + actorApi.startActor(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void stopFsm(RpcFsmName request, StreamObserver responseObserver) { + try { + actorApi.stopActor(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void stopProcess(RpcProcessId request, StreamObserver responseObserver) { + try { + actorApi.stopProcess(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void triggerCrawl(RpcFileStorageId request, StreamObserver responseObserver) { + try { + processingService.startCrawl(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void triggerRecrawl(RpcFileStorageId request, StreamObserver responseObserver) { + try { + processingService.startRecrawl(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void triggerConvert(RpcFileStorageId request, StreamObserver responseObserver) { + try { + processingService.startConversion(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void triggerConvertAndLoad(RpcFileStorageId request, StreamObserver responseObserver) { + try { + processingService.startConvertLoad(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void loadProcessedData(RpcFileStorageIds request, StreamObserver responseObserver) { + try { + processingService.startLoad(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void calculateAdjacencies(Empty request, StreamObserver responseObserver) { + try { + processingService.startAdjacencyCalculation(); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void sideloadEncyclopedia(RpcSideloadEncyclopedia request, StreamObserver responseObserver) { + try { + sideloadService.sideloadEncyclopedia(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void sideloadDirtree(RpcSideloadDirtree request, StreamObserver responseObserver) { + try { + sideloadService.sideloadDirtree(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void sideloadWarc(RpcSideloadWarc request, StreamObserver responseObserver) { + try { + sideloadService.sideloadWarc(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void sideloadStackexchange(RpcSideloadStackexchange request, StreamObserver responseObserver) { + try { + sideloadService.sideloadStackexchange(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void createCrawlSpecFromDownload(RpcCrawlSpecFromDownload request, StreamObserver responseObserver) { + try { + processingService.createCrawlSpecFromDownload(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void exportAtags(RpcFileStorageId request, StreamObserver responseObserver) { + try { + exportService.exportAtags(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void exportSampleData(RpcExportSampleData request, StreamObserver responseObserver) { + try { + exportService.exportSampleData(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void exportRssFeeds(RpcFileStorageId request, StreamObserver responseObserver) { + try { + exportService.exportFeeds(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void exportTermFrequencies(RpcFileStorageId request, StreamObserver responseObserver) { + try { + exportService.exportTermFrequencies(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void downloadSampleData(RpcDownloadSampleData request, StreamObserver responseObserver) { + try { + sideloadService.downloadSampleData(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void exportData(Empty request, StreamObserver responseObserver) { + try { + exportService.exportData(); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + + } + + @Override + public void restoreBackup(RpcFileStorageId request, StreamObserver responseObserver) { + try { + backupService.restore(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void getActorStates(Empty request, StreamObserver responseObserver) { + responseObserver.onNext(actorApi.getActorStates()); + responseObserver.onCompleted(); + } + + @Override + public void listSideloadDir(Empty request, StreamObserver responseObserver) { + try { + responseObserver.onNext(sideloadService.listUploadDir()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + + @Override + public void listFileStorage(RpcFileStorageId request, StreamObserver responseObserver) { + try { + responseObserver.onNext(transferService.listFiles(request)); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } +} diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java index 613ca18f..ac567467 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorSvc.java @@ -2,27 +2,19 @@ package nu.marginalia.executor; import com.google.gson.Gson; import com.google.inject.Inject; +import io.grpc.ServerBuilder; import nu.marginalia.actor.ExecutorActor; -import nu.marginalia.actor.ActorApi; import nu.marginalia.actor.ExecutorActorControlService; -import nu.marginalia.actor.state.ActorStateInstance; -import nu.marginalia.executor.model.ActorRunState; -import nu.marginalia.executor.model.ActorRunStates; -import nu.marginalia.executor.svc.*; +import nu.marginalia.executor.svc.TransferService; import nu.marginalia.service.server.BaseServiceParams; import nu.marginalia.service.server.Service; import nu.marginalia.service.server.mq.MqRequest; -import nu.marginalia.storage.FileStorageService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import spark.Request; -import spark.Response; import spark.Spark; import java.io.IOException; import java.sql.SQLException; -import java.util.Comparator; -import java.util.concurrent.ConcurrentHashMap; // Weird name for this one to not have clashes with java.util.concurrent.ExecutorService public class ExecutorSvc extends Service { @@ -36,51 +28,19 @@ public class ExecutorSvc extends Service { @Inject public ExecutorSvc(BaseServiceParams params, ExecutorActorControlService actorControlService, - ProcessingService processingService, - SideloadService sideloadService, - BackupService backupService, - ExportService exportService, + ExecutorGrpcService executorGrpcService, Gson gson, - TransferService transferService, - ActorApi actorApi) { + TransferService transferService) throws IOException { super(params); this.params = params; this.gson = gson; this.actorControlService = actorControlService; this.transferService = transferService; - Spark.post("/actor/:id/start", actorApi::startActor); - Spark.post("/actor/:id/start/:state", actorApi::startActorFromState); - Spark.post("/actor/:id/stop", actorApi::stopActor); - - Spark.get("/actor", this::getActorStates, gson::toJson); - - Spark.post("/process/:id/stop", actorApi::stopProcess); - - Spark.post("/process/crawl/:fid", processingService::startCrawl); - Spark.post("/process/recrawl", processingService::startRecrawl); - Spark.post("/process/convert/:fid", processingService::startConversion); - Spark.post("/process/convert-load/:fid", processingService::startConvertLoad); - Spark.post("/process/crawl-spec/from-download", processingService::createCrawlSpecFromDownload); - Spark.post("/process/load", processingService::startLoad); - Spark.post("/process/adjacency-calculation", processingService::startAdjacencyCalculation); - - Spark.get("/sideload/", sideloadService::listUploadDir, gson::toJson); - Spark.post("/sideload/dirtree", sideloadService::sideloadDirtree); - Spark.post("/sideload/warc", sideloadService::sideloadWarc); - Spark.post("/sideload/stackexchange", sideloadService::sideloadStackexchange); - Spark.post("/sideload/encyclopedia", sideloadService::sideloadEncyclopedia); - - Spark.post("/action/download-sample-data", sideloadService::downloadSampleData); - - Spark.post("/export/atags", exportService::exportAtags); - Spark.post("/export/sample-data", exportService::exportSampleData); - Spark.post("/export/feeds", exportService::exportFeeds); - Spark.post("/export/termfreq", exportService::exportTermFrequencies); - Spark.post("/export/data", exportService::exportData); - - Spark.post("/backup/:fid/restore", backupService::restore); - Spark.get("/storage/:fid", transferService::listFiles, gson::toJson); + var grpcServer = ServerBuilder.forPort(params.configuration.port() + 1) + .addService(executorGrpcService) + .build(); + grpcServer.start(); Spark.get("/transfer/file/:fid", transferService::transferFile); @@ -123,37 +83,4 @@ public class ExecutorSvc extends Service { return "OK"; } - - private final ConcurrentHashMap actorStateDescriptions = new ConcurrentHashMap<>(); - - private ActorRunStates getActorStates(Request request, Response response) { - var items = actorControlService.getActorStates().entrySet().stream().map(e -> { - final var stateGraph = actorControlService.getActorDefinition(e.getKey()); - - final ActorStateInstance state = e.getValue(); - final String actorDescription = stateGraph.describe(); - - final String machineName = e.getKey().name(); - final String stateName = state.name(); - - final String stateDescription = ""; - - final boolean terminal = state.isFinal(); - final boolean canStart = actorControlService.isDirectlyInitializable(e.getKey()) && terminal; - - return new ActorRunState(machineName, - stateName, - actorDescription, - stateDescription, - terminal, - canStart); - }) - .filter(s -> !s.terminal() || s.canStart()) - .sorted(Comparator.comparing(ActorRunState::name)) - .toList(); - - return new ActorRunStates(params.configuration.node(), items); - } - - } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/BackupService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/BackupService.java index 05acceb0..45f8c622 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/BackupService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/BackupService.java @@ -4,9 +4,8 @@ import com.google.inject.Inject; import nu.marginalia.actor.ExecutorActor; import nu.marginalia.actor.ExecutorActorControlService; import nu.marginalia.actor.task.RestoreBackupActor; +import nu.marginalia.executor.api.RpcFileStorageId; import nu.marginalia.storage.model.FileStorageId; -import spark.Request; -import spark.Response; public class BackupService { private final ExecutorActorControlService actorControlService; @@ -16,9 +15,8 @@ public class BackupService { this.actorControlService = actorControlService; } - public Object restore(Request request, Response response) throws Exception { - var fid = FileStorageId.parse(request.params("fid")); + public void restore(RpcFileStorageId request) throws Exception { + var fid = FileStorageId.of(request.getFileStorageId()); actorControlService.startFrom(ExecutorActor.RESTORE_BACKUP, new RestoreBackupActor.Restore(fid)); - return ""; } } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ExportService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ExportService.java index 388af5b5..9f941ab9 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ExportService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ExportService.java @@ -3,13 +3,10 @@ package nu.marginalia.executor.svc; import com.google.inject.Inject; import nu.marginalia.actor.ExecutorActor; import nu.marginalia.actor.ExecutorActorControlService; -import nu.marginalia.actor.task.ConvertActor; -import nu.marginalia.actor.task.ExportAtagsActor; -import nu.marginalia.actor.task.ExportDataActor; -import nu.marginalia.actor.task.ExportSampleDataActor; +import nu.marginalia.actor.task.*; +import nu.marginalia.executor.api.RpcExportSampleData; +import nu.marginalia.executor.api.RpcFileStorageId; import nu.marginalia.storage.model.FileStorageId; -import spark.Request; -import spark.Response; public class ExportService { private final ExecutorActorControlService actorControlService; @@ -19,33 +16,36 @@ public class ExportService { this.actorControlService = actorControlService; } - public Object exportData(Request request, Response response) throws Exception { + public void exportData() throws Exception { actorControlService.startFrom(ExecutorActor.EXPORT_DATA, new ExportDataActor.Export()); - return ""; } - public Object exportSampleData(Request request, Response response) throws Exception { - actorControlService.startFrom(ExecutorActor.EXPORT_SAMPLE_DATA, new ExportSampleDataActor.Export( - FileStorageId.parse(request.queryParams("fid")), - Integer.parseInt(request.queryParams("size")), - request.queryParams("name") - )); - return ""; + public void exportSampleData(RpcExportSampleData request) throws Exception { + actorControlService.startFrom(ExecutorActor.EXPORT_SAMPLE_DATA, + new ExportSampleDataActor.Export( + FileStorageId.of(request.getFileStorageId()), + request.getSize(), + request.getName() + ) + ); } - public Object exportAtags(Request request, Response response) throws Exception { - actorControlService.startFrom(ExecutorActor.EXPORT_ATAGS, new ExportAtagsActor.Export(FileStorageId.parse(request.queryParams("fid")))); - return ""; + public void exportAtags(RpcFileStorageId request) throws Exception { + actorControlService.startFrom(ExecutorActor.EXPORT_ATAGS, + new ExportAtagsActor.Export(FileStorageId.of(request.getFileStorageId())) + ); } - public Object exportFeeds(Request request, Response response) throws Exception { - actorControlService.startFrom(ExecutorActor.EXPORT_FEEDS, new ExportAtagsActor.Export(FileStorageId.parse(request.queryParams("fid")))); - return ""; - } - public Object exportTermFrequencies(Request request, Response response) throws Exception { - actorControlService.startFrom(ExecutorActor.EXPORT_TERM_FREQUENCIES, new ExportAtagsActor.Export(FileStorageId.parse(request.queryParams("fid")))); - return ""; + public void exportFeeds(RpcFileStorageId request) throws Exception { + actorControlService.startFrom(ExecutorActor.EXPORT_FEEDS, + new ExportFeedsActor.Export(FileStorageId.of(request.getFileStorageId())) + ); } + public void exportTermFrequencies(RpcFileStorageId request) throws Exception { + actorControlService.startFrom(ExecutorActor.EXPORT_TERM_FREQUENCIES, + new ExportTermFreqActor.Export(FileStorageId.of(request.getFileStorageId())) + ); + } } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ProcessingService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ProcessingService.java index f8d83d98..bee5c2c1 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ProcessingService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/ProcessingService.java @@ -1,84 +1,62 @@ package nu.marginalia.executor.svc; -import com.google.gson.Gson; import com.google.inject.Inject; import nu.marginalia.actor.ExecutorActor; import nu.marginalia.actor.ExecutorActorControlService; import nu.marginalia.actor.task.*; +import nu.marginalia.executor.api.RpcCrawlSpecFromDownload; +import nu.marginalia.executor.api.RpcFileStorageId; +import nu.marginalia.executor.api.RpcFileStorageIds; import nu.marginalia.storage.model.FileStorageId; -import nu.marginalia.executor.model.load.LoadParameters; -import spark.Request; -import spark.Response; + +import java.util.stream.Collectors; public class ProcessingService { private final ExecutorActorControlService actorControlService; - private final Gson gson; @Inject - public ProcessingService(ExecutorActorControlService actorControlService, - Gson gson) { + public ProcessingService(ExecutorActorControlService actorControlService) { this.actorControlService = actorControlService; - this.gson = gson; } - public Object startRecrawl(Request request, Response response) throws Exception { - var crawlId = gson.fromJson(request.body(), FileStorageId.class); - - actorControlService.startFrom( - ExecutorActor.RECRAWL, - new RecrawlActor.Initial(crawlId, false) - ); - - return ""; + public void startRecrawl(RpcFileStorageId request) throws Exception { + actorControlService.startFrom(ExecutorActor.RECRAWL, + new RecrawlActor.Initial(FileStorageId.of(request.getFileStorageId()), false)); } - public Object startCrawl(Request request, Response response) throws Exception { + public void startCrawl(RpcFileStorageId request) throws Exception { actorControlService.startFrom(ExecutorActor.CRAWL, - new CrawlActor.Initial(FileStorageId.parse(request.params("fid")))); - - return ""; + new CrawlActor.Initial(FileStorageId.of(request.getFileStorageId()))); } - public Object startConversion(Request request, Response response) throws Exception { + public void startConversion(RpcFileStorageId request) throws Exception { actorControlService.startFrom(ExecutorActor.CONVERT, - new ConvertActor.Convert(FileStorageId.parse(request.params("fid")))); - - return ""; + new ConvertActor.Convert(FileStorageId.of(request.getFileStorageId()))); } - public Object startConvertLoad(Request request, Response response) throws Exception { - actorControlService.startFrom( - ExecutorActor.CONVERT_AND_LOAD, - new ConvertAndLoadActor.Initial(FileStorageId.parse(request.params("fid"))) + public void startConvertLoad(RpcFileStorageId request) throws Exception { + actorControlService.startFrom(ExecutorActor.CONVERT_AND_LOAD, + new ConvertAndLoadActor.Initial(FileStorageId.of(request.getFileStorageId()))); + } + + public void startLoad(RpcFileStorageIds request) throws Exception { + actorControlService.startFrom(ExecutorActor.CONVERT_AND_LOAD, + new ConvertAndLoadActor.Load(request.getFileStorageIdsList() + .stream() + .map(FileStorageId::of) + .collect(Collectors.toList())) ); - - return ""; } - - public Object startLoad(Request request, Response response) throws Exception { - var params = gson.fromJson(request.body(), LoadParameters.class); - - // Start the FSM from the intermediate state that triggers the load - actorControlService.startFrom( - ExecutorActor.CONVERT_AND_LOAD, - new ConvertAndLoadActor.Load(params.ids()) - ); - - return ""; - } - - public Object startAdjacencyCalculation(Request request, Response response) throws Exception { + public void startAdjacencyCalculation() throws Exception { actorControlService.startFrom(ExecutorActor.ADJACENCY_CALCULATION, new TriggerAdjacencyCalculationActor.Run()); - return ""; } - public Object createCrawlSpecFromDownload(Request request, Response response) throws Exception { + public void createCrawlSpecFromDownload(RpcCrawlSpecFromDownload request) throws Exception { actorControlService.startFrom(ExecutorActor.CRAWL_JOB_EXTRACTOR, new CrawlJobExtractorActor.CreateFromUrl( - request.queryParamOrDefault("description", ""), - request.queryParamOrDefault("url", "")) + request.getDescription(), + request.getUrl()) ); - return ""; } } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java index 8ef39c12..7daefe6e 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java @@ -6,18 +6,16 @@ import nu.marginalia.actor.ExecutorActor; import nu.marginalia.actor.ExecutorActorControlService; import nu.marginalia.actor.task.ConvertActor; import nu.marginalia.actor.task.DownloadSampleActor; -import nu.marginalia.executor.upload.UploadDirContents; -import nu.marginalia.executor.upload.UploadDirItem; +import nu.marginalia.executor.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import spark.Request; -import spark.Response; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; -import java.util.ArrayList; -import java.util.List; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.format.DateTimeFormatter; public class SideloadService { private final ExecutorActorControlService actorControlService; @@ -28,43 +26,61 @@ public class SideloadService { this.actorControlService = actorControlService; } - public Object sideloadDirtree(Request request, Response response) throws Exception { - actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertDirtree(request.queryParams("path"))); - return ""; + public void sideloadDirtree(RpcSideloadDirtree request) throws Exception { + actorControlService.startFrom(ExecutorActor.CONVERT, + new ConvertActor.ConvertDirtree(request.getSourcePath()) + ); } - public Object sideloadWarc(Request request, Response response) throws Exception { - actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertWarc(request.queryParams("path"))); - return ""; + + public void sideloadWarc(RpcSideloadWarc request) throws Exception { + actorControlService.startFrom(ExecutorActor.CONVERT, + new ConvertActor.ConvertWarc(request.getSourcePath()) + ); } - public Object sideloadEncyclopedia(Request request, Response response) throws Exception { + + public void sideloadEncyclopedia(RpcSideloadEncyclopedia request) throws Exception { actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertEncyclopedia( - request.queryParams("path"), - request.queryParams("baseUrl") + request.getSourcePath(), + request.getBaseUrl() )); - return ""; } - public Object sideloadStackexchange(Request request, Response response) throws Exception { - actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertStackexchange(request.queryParams("path"))); - return ""; + public void sideloadStackexchange(RpcSideloadStackexchange request) throws Exception { + actorControlService.startFrom(ExecutorActor.CONVERT, + new ConvertActor.ConvertStackexchange(request.getSourcePath()) + ); } - public UploadDirContents listUploadDir(Request request, Response response) throws IOException { + public RpcUploadDirContents listUploadDir() throws IOException { Path uploadDir = WmsaHome.getUploadDir(); try (var items = Files.list(uploadDir)) { - return new UploadDirContents(uploadDir.toString(), - items.map(UploadDirItem::fromPath).toList()); - } + var builder = RpcUploadDirContents.newBuilder().setPath(uploadDir.toString()); + var iter = items.iterator(); + while (iter.hasNext()) { + var path = iter.next(); + + boolean isDir = Files.isDirectory(path); + long size = isDir ? 0 : Files.size(path); + var mtime = Files.getLastModifiedTime(path); + + builder.addEntriesBuilder() + .setName(path.toString()) + .setIsDirectory(isDir) + .setLastModifiedTime( + LocalDateTime.ofInstant(mtime.toInstant(), ZoneId.systemDefault()).format(DateTimeFormatter.ISO_DATE_TIME)) + .setSize(size); + } + + return builder.build(); + } } - public Object downloadSampleData(Request request, Response response) throws Exception { - String sampleSet = request.queryParams("set"); + public void downloadSampleData(RpcDownloadSampleData request) throws Exception { + String sampleSet = request.getSampleSet(); actorControlService.startFrom(ExecutorActor.DOWNLOAD_SAMPLE, new DownloadSampleActor.Run(sampleSet)); - - return ""; } } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java index ed84695f..377bd354 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/TransferService.java @@ -5,11 +5,12 @@ import com.google.inject.Inject; import com.zaxxer.hikari.HikariDataSource; import lombok.SneakyThrows; import nu.marginalia.client.Context; +import nu.marginalia.executor.api.RpcFileStorageContent; +import nu.marginalia.executor.api.RpcFileStorageEntry; +import nu.marginalia.executor.api.RpcFileStorageId; import nu.marginalia.executor.client.ExecutorClient; import nu.marginalia.executor.model.transfer.TransferItem; import nu.marginalia.executor.model.transfer.TransferSpec; -import nu.marginalia.executor.storage.FileStorageContent; -import nu.marginalia.executor.storage.FileStorageFile; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mq.persistence.MqPersistence; import nu.marginalia.process.log.WorkLog; @@ -74,30 +75,31 @@ public class TransferService { } - public FileStorageContent listFiles(Request request, Response response) throws SQLException, IOException { - FileStorageId fileStorageId = FileStorageId.parse(request.params("fid")); + public RpcFileStorageContent listFiles(RpcFileStorageId request) throws SQLException, IOException { + FileStorageId fileStorageId = FileStorageId.of(request.getFileStorageId()); var storage = fileStorageService.getStorage(fileStorageId); - List files; + var builder = RpcFileStorageContent.newBuilder(); + try (var fs = Files.list(storage.asPath())) { - files = fs.filter(Files::isRegularFile) + fs.filter(Files::isRegularFile) .map(this::createFileModel) - .sorted(Comparator.comparing(FileStorageFile::name)) - .toList(); + .sorted(Comparator.comparing(RpcFileStorageEntry::getName)) + .forEach(builder::addEntries); } - return new FileStorageContent(files); + return builder.build(); } @SneakyThrows - private FileStorageFile createFileModel(Path path) { - return new FileStorageFile( - path.toFile().getName(), - Files.size(path), - Files.getLastModifiedTime(path).toInstant().toString() - ); + private RpcFileStorageEntry createFileModel(Path path) { + return RpcFileStorageEntry.newBuilder() + .setName(path.toFile().getName()) + .setSize(Files.size(path)) + .setLastModifiedTime(Files.getLastModifiedTime(path).toInstant().toString()) + .build(); } public TransferSpec getTransferSpec(Request request, Response response) throws SQLException { diff --git a/code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java b/code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java deleted file mode 100644 index 29fdf8e3..00000000 --- a/code/services-core/executor-service/src/test/java/nu/marginalia/executor/ExecutorSvcApiIntegrationTest.java +++ /dev/null @@ -1,155 +0,0 @@ -package nu.marginalia.executor; - -import com.google.inject.AbstractModule; -import com.google.inject.Guice; -import com.google.inject.Inject; -import com.google.inject.Provides; -import nu.marginalia.actor.ExecutorActor; -import nu.marginalia.actor.ExecutorActorControlService; -import nu.marginalia.actor.task.TriggerAdjacencyCalculationActor; -import nu.marginalia.client.Context; -import nu.marginalia.client.route.RouteProvider; -import nu.marginalia.process.ProcessOutboxes; -import nu.marginalia.process.ProcessService; -import nu.marginalia.storage.FileStorageService; -import nu.marginalia.storage.model.FileStorageId; -import nu.marginalia.executor.client.ExecutorClient; -import nu.marginalia.index.client.IndexClient; -import nu.marginalia.mq.MessageQueueFactory; -import nu.marginalia.mq.inbox.MqAsynchronousInbox; -import nu.marginalia.service.control.ServiceEventLog; -import nu.marginalia.service.control.ServiceHeartbeatImpl; -import nu.marginalia.service.descriptor.ServiceDescriptor; -import nu.marginalia.service.descriptor.ServiceDescriptors; -import nu.marginalia.service.id.ServiceId; -import nu.marginalia.service.module.ServiceConfiguration; -import nu.marginalia.service.server.Initialization; -import nu.marginalia.service.server.MetricsServer; -import org.junit.jupiter.api.*; -import org.mockito.Mockito; -import spark.Spark; - -import java.nio.file.Path; -import java.util.List; -import java.util.UUID; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; - -@Tag("slow") -public class ExecutorSvcApiIntegrationTest { - - static TestInstances testInstances; - static final int port = 9999; - - @BeforeAll - public static void setUpAll() { - RouteProvider.setDefaultPort(port); - var injector = Guice.createInjector(new TestModule()); - testInstances = injector.getInstance(TestInstances.class); - injector.getInstance(Initialization.class).setReady(); - } - - @AfterAll - public static void tearDownAll() { - RouteProvider.resetDefaultPort(); - } - - @BeforeEach - public void setUp() { - Mockito.reset(testInstances.actorControlService); - } - - @AfterAll - public static void tearDown() { - Spark.stop(); - } - - @Test - public void startStartActor() throws Exception { - testInstances.client.startFsm(Context.internal(), 0, "crawl"); - Mockito.verify(testInstances.actorControlService).start(ExecutorActor.CRAWL); - } - - @Test - public void startStopActor() { - testInstances.client.stopFsm(Context.internal(), 0, "crawl"); - - Mockito.verify(testInstances.actorControlService).stop(ExecutorActor.CRAWL); - } - - @Test - public void triggerCrawl() throws Exception { - testInstances.client.triggerCrawl(Context.internal(), 0, FileStorageId.of(1)); - - Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CRAWL), any()); - } - - @Test - public void triggerRecrawl() throws Exception { - testInstances.client.triggerRecrawl(Context.internal(), 0, - new FileStorageId(0)); - - Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.RECRAWL), any()); - } - - - @Test - public void triggerProcessAndLoad() throws Exception { - testInstances.client.triggerConvertAndLoad(Context.internal(), 0, FileStorageId.of(1)); - - Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT_AND_LOAD), any()); - } - - @Test - public void calculateAdjacencies() throws Exception { - testInstances.client.calculateAdjacencies(Context.internal(), 0); - - Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.ADJACENCY_CALCULATION), eq(new TriggerAdjacencyCalculationActor.Run())); - } - -} - -class TestInstances { - @Inject - ExecutorSvc service; - @Inject - ExecutorClient client; - @Inject - ExecutorActorControlService actorControlService; -} -class TestModule extends AbstractModule { - - @Override - public void configure() { - System.setProperty("service-name", "test"); - bind(ExecutorActorControlService.class).toInstance(Mockito.mock(ExecutorActorControlService.class)); - bind(FileStorageService.class).toInstance(Mockito.mock(FileStorageService.class)); - bind(ProcessService.class).toInstance(Mockito.mock(ProcessService.class)); - bind(ServiceEventLog.class).toInstance(Mockito.mock(ServiceEventLog.class)); - bind(ServiceHeartbeatImpl.class).toInstance(Mockito.mock(ServiceHeartbeatImpl.class)); - bind(MetricsServer.class).toInstance(Mockito.mock(MetricsServer.class)); - bind(IndexClient.class).toInstance(Mockito.mock(IndexClient.class)); - bind(ProcessOutboxes.class).toInstance(Mockito.mock(ProcessOutboxes.class)); - bind(ServiceConfiguration.class) - .toInstance(new ServiceConfiguration(ServiceId.Executor, - 0, "127.0.0.1", ExecutorSvcApiIntegrationTest.port, -1, UUID.randomUUID())); - } - - @Provides - public ServiceDescriptors getServiceDescriptors() { - return new ServiceDescriptors( - List.of(new ServiceDescriptor(ServiceId.Executor, "127.0.0.1")) - ); - } - - @Provides - public MessageQueueFactory getMessageQueueFactory() { - var mock = Mockito.mock(MessageQueueFactory.class); - - Mockito.when(mock.createAsynchronousInbox(Mockito.anyString(), Mockito.anyInt(), any())). - thenReturn(Mockito.mock(MqAsynchronousInbox.class)); - - return mock; - } -} \ No newline at end of file diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/svc/NodeConfigurationWatcher.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/NodeConfigurationWatcher.java similarity index 95% rename from code/services-core/query-service/src/main/java/nu/marginalia/query/svc/NodeConfigurationWatcher.java rename to code/services-core/query-service/src/main/java/nu/marginalia/query/NodeConfigurationWatcher.java index f26da0f8..a7117387 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/svc/NodeConfigurationWatcher.java +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/NodeConfigurationWatcher.java @@ -1,4 +1,4 @@ -package nu.marginalia.query.svc; +package nu.marginalia.query; import com.google.inject.Inject; import lombok.SneakyThrows; @@ -6,7 +6,6 @@ import nu.marginalia.nodecfg.NodeConfigurationService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryBasicInterface.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryBasicInterface.java index 363b493f..15da22bd 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryBasicInterface.java +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryBasicInterface.java @@ -4,11 +4,9 @@ import com.google.gson.Gson; import com.google.inject.Inject; import nu.marginalia.client.Context; import nu.marginalia.index.client.IndexClient; -import nu.marginalia.index.client.model.query.SearchSetIdentifier; import nu.marginalia.index.query.limit.QueryLimits; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.query.model.QueryParams; -import nu.marginalia.query.svc.NodeConfigurationWatcher; import nu.marginalia.renderer.MustacheRenderer; import nu.marginalia.renderer.RendererFactory; import nu.marginalia.query.svc.QueryFactory; diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCDomainLinksService.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCDomainLinksService.java index 78cfc637..2eb4c01c 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCDomainLinksService.java +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCDomainLinksService.java @@ -3,28 +3,34 @@ package nu.marginalia.query; import com.google.inject.Inject; import io.grpc.ManagedChannel; import io.grpc.stub.StreamObserver; +import nu.marginalia.client.grpc.GrpcStubPool; import nu.marginalia.index.api.IndexDomainLinksApiGrpc; import nu.marginalia.index.api.RpcDomainIdCount; import nu.marginalia.index.api.RpcDomainIdList; import nu.marginalia.index.api.RpcDomainIdPairs; -import nu.marginalia.query.svc.NodeConfigurationWatcher; +import nu.marginalia.service.id.ServiceId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; + public class QueryGRPCDomainLinksService extends IndexDomainLinksApiGrpc.IndexDomainLinksApiImplBase { private static final Logger logger = LoggerFactory.getLogger(QueryGRPCDomainLinksService.class); - private final NodeConfigurationWatcher nodeConfigurationWatcher; - private final QueryGrpcStubPool stubPool; + private final GrpcStubPool stubPool; @Inject public QueryGRPCDomainLinksService(NodeConfigurationWatcher nodeConfigurationWatcher) { - this.nodeConfigurationWatcher = nodeConfigurationWatcher; - stubPool = new QueryGrpcStubPool<>(nodeConfigurationWatcher) { + stubPool = new GrpcStubPool<>(ServiceId.Index) { @Override public IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub createStub(ManagedChannel channel) { return IndexDomainLinksApiGrpc.newBlockingStub(channel); } + + @Override + public List getEligibleNodes() { + return nodeConfigurationWatcher.getQueryNodes(); + } }; } diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCService.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCService.java index 5f59ee15..ef253253 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCService.java +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCService.java @@ -4,11 +4,12 @@ import com.google.inject.Inject; import io.grpc.ManagedChannel; import io.prometheus.client.Histogram; import lombok.SneakyThrows; +import nu.marginalia.client.grpc.GrpcStubPool; import nu.marginalia.db.DomainBlacklist; import nu.marginalia.index.api.*; import nu.marginalia.model.id.UrlIdCodec; -import nu.marginalia.query.svc.NodeConfigurationWatcher; import nu.marginalia.query.svc.QueryFactory; +import nu.marginalia.service.id.ServiceId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,23 +27,29 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase { .help("QS-side query time (GRPC endpoint)") .register(); - private final QueryGrpcStubPool stubPool; + private final GrpcStubPool stubPool; private final QueryFactory queryFactory; private final DomainBlacklist blacklist; - private final NodeConfigurationWatcher nodeConfigurationWatcher; @Inject - public QueryGRPCService(QueryFactory queryFactory, DomainBlacklist blacklist, NodeConfigurationWatcher nodeConfigurationWatcher) { + public QueryGRPCService(QueryFactory queryFactory, + DomainBlacklist blacklist, + NodeConfigurationWatcher nodeConfigurationWatcher) + { this.queryFactory = queryFactory; this.blacklist = blacklist; - this.nodeConfigurationWatcher = nodeConfigurationWatcher; - stubPool = new QueryGrpcStubPool<>(nodeConfigurationWatcher) { + stubPool = new GrpcStubPool<>(ServiceId.Index) { @Override public IndexApiGrpc.IndexApiBlockingStub createStub(ManagedChannel channel) { return IndexApiGrpc.newBlockingStub(channel); } + + @Override + public List getEligibleNodes() { + return nodeConfigurationWatcher.getQueryNodes(); + } }; } diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java index 7e864635..bd5eaebd 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java @@ -12,7 +12,6 @@ import nu.marginalia.index.client.model.results.DecoratedSearchResultItem; import nu.marginalia.index.client.model.results.SearchResultSet; import nu.marginalia.query.model.QueryParams; import nu.marginalia.query.model.QueryResponse; -import nu.marginalia.query.svc.NodeConfigurationWatcher; import nu.marginalia.query.svc.QueryFactory; import nu.marginalia.service.server.BaseServiceParams; import nu.marginalia.service.server.Service; diff --git a/protobuf.gradle b/protobuf.gradle new file mode 100644 index 00000000..af2f5317 --- /dev/null +++ b/protobuf.gradle @@ -0,0 +1,29 @@ +// Boilerplate configuration that should be included whenever protobufs are used +// see e.g. index-api's build.gradle + +protobuf { + protoc { + if (osdetector.os == "osx") { + artifact = "com.google.protobuf:protoc:3.0.2:osx-x86_64" + } else { + artifact = "com.google.protobuf:protoc:3.0.2" + } + } + plugins { + grpc { + if (osdetector.os == "osx") { + artifact = "io.grpc:protoc-gen-grpc-java:1.1.2:osx-x86_64" + } else { + artifact = "io.grpc:protoc-gen-grpc-java:1.1.2" + } + } + } + + generateProtoTasks { + all().each { task -> + task.plugins { + grpc {} + } + } + } +} \ No newline at end of file