Make executor API talk GRPC (#75)
* (executor-api) Make executor API talk GRPC The executor's REST API was very fragile and annoying to work with, lacking even basic type safety. Migrate to use GRPC instead. GRPC is a bit of a pain with how verbose it is, but that is probably a lesser evil. This is a fairly straightforward change, but it's also large so a solid round of testing is needed... The change set breaks out the GrpcStubPool previously residing in the QueryService, and makes it available to all clients. ServiceId.name was also renamed to avoid the very dangerous clash with Enum.name(). The boilerplate needed for grpc was also extracted into a common gradle file for inclusion into the appropriate build.gradle-files.
This commit is contained in:
parent
d83a3bf4e2
commit
e8de468b0b
@ -1,6 +1,7 @@
|
||||
plugins {
|
||||
id 'java'
|
||||
id 'jvm-test-suite'
|
||||
id "com.google.protobuf" version "0.9.4"
|
||||
}
|
||||
|
||||
java {
|
||||
@ -9,6 +10,18 @@ java {
|
||||
}
|
||||
}
|
||||
|
||||
apply from: "$rootProject.projectDir/protobuf.gradle"
|
||||
|
||||
sourceSets {
|
||||
main {
|
||||
proto {
|
||||
srcDir 'src/main/protobuf'
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:api:index-api')
|
||||
@ -25,6 +38,8 @@ dependencies {
|
||||
implementation libs.guice
|
||||
implementation libs.rxjava
|
||||
implementation libs.protobuf
|
||||
implementation libs.bundles.grpc
|
||||
implementation libs.javax.annotation
|
||||
implementation libs.gson
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
|
@ -1,154 +1,282 @@
|
||||
package nu.marginalia.executor.client;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import io.reactivex.rxjava3.core.Observable;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.client.AbstractDynamicClient;
|
||||
import nu.marginalia.client.Context;
|
||||
import nu.marginalia.client.exception.RouteNotConfiguredException;
|
||||
import nu.marginalia.client.grpc.GrpcStubPool;
|
||||
import nu.marginalia.executor.api.*;
|
||||
import nu.marginalia.executor.api.ExecutorApiGrpc.ExecutorApiBlockingStub;
|
||||
import nu.marginalia.executor.model.ActorRunState;
|
||||
import nu.marginalia.executor.model.ActorRunStates;
|
||||
import nu.marginalia.executor.model.load.LoadParameters;
|
||||
import nu.marginalia.executor.model.transfer.TransferItem;
|
||||
import nu.marginalia.executor.model.transfer.TransferSpec;
|
||||
import nu.marginalia.executor.storage.FileStorageContent;
|
||||
import nu.marginalia.executor.storage.FileStorageFile;
|
||||
import nu.marginalia.executor.upload.UploadDirContents;
|
||||
import nu.marginalia.executor.upload.UploadDirItem;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import nu.marginalia.nodecfg.model.NodeConfiguration;
|
||||
import nu.marginalia.service.descriptor.ServiceDescriptors;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.storage.model.FileStorage;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
|
||||
import io.grpc.ManagedChannel;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Singleton
|
||||
public class ExecutorClient extends AbstractDynamicClient {
|
||||
private final GrpcStubPool<ExecutorApiBlockingStub> stubPool;
|
||||
private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class);
|
||||
|
||||
@Inject
|
||||
public ExecutorClient(ServiceDescriptors descriptors) {
|
||||
public ExecutorClient(ServiceDescriptors descriptors, NodeConfigurationService nodeConfigurationService) {
|
||||
super(descriptors.forId(ServiceId.Executor), GsonFactory::get);
|
||||
|
||||
stubPool = new GrpcStubPool<>(ServiceId.Executor) {
|
||||
@Override
|
||||
public ExecutorApiBlockingStub createStub(ManagedChannel channel) {
|
||||
return ExecutorApiGrpc.newBlockingStub(channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Integer> getEligibleNodes() {
|
||||
return nodeConfigurationService.getAll()
|
||||
.stream()
|
||||
.map(NodeConfiguration::node)
|
||||
.toList();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public void startFsm(Context ctx, int node, String actorName) {
|
||||
post(ctx, node, "/actor/"+actorName+"/start", "").blockingSubscribe();
|
||||
public void startFsm(int node, String actorName) {
|
||||
stubPool.apiForNode(node).startFsm(
|
||||
RpcFsmName.newBuilder()
|
||||
.setActorName(actorName)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void stopFsm(Context ctx, int node, String actorName) {
|
||||
post(ctx, node, "/actor/"+actorName+"/stop", "").blockingSubscribe();
|
||||
public void stopFsm(int node, String actorName) {
|
||||
stubPool.apiForNode(node).stopFsm(
|
||||
RpcFsmName.newBuilder()
|
||||
.setActorName(actorName)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void stopProcess(Context ctx, int node, String id) {
|
||||
post(ctx, node, "/process/" + id + "/stop", "").blockingSubscribe();
|
||||
public void stopProcess(int node, String id) {
|
||||
stubPool.apiForNode(node).stopProcess(
|
||||
RpcProcessId.newBuilder()
|
||||
.setProcessId(id)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
|
||||
public void triggerCrawl(Context ctx, int node, FileStorageId fid) {
|
||||
post(ctx, node, "/process/crawl/" + fid, "").blockingSubscribe();
|
||||
public void triggerCrawl(int node, FileStorageId fid) {
|
||||
stubPool.apiForNode(node).triggerCrawl(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void triggerRecrawl(Context ctx, int node, FileStorageId fid) {
|
||||
post(ctx, node, "/process/recrawl", fid).blockingSubscribe();
|
||||
public void triggerRecrawl(int node, FileStorageId fid) {
|
||||
stubPool.apiForNode(node).triggerRecrawl(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void triggerConvert(Context ctx, int node, FileStorageId fid) {
|
||||
post(ctx, node, "/process/convert/" + fid.id(), "").blockingSubscribe();
|
||||
public void triggerConvert(int node, FileStorageId fid) {
|
||||
stubPool.apiForNode(node).triggerConvert(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void triggerConvertAndLoad(Context ctx, int node, FileStorageId fid) {
|
||||
post(ctx, node, "/process/convert-load/" + fid.id(), "").blockingSubscribe();
|
||||
public void triggerConvertAndLoad(int node, FileStorageId fid) {
|
||||
stubPool.apiForNode(node).triggerConvertAndLoad(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void loadProcessedData(Context ctx, int node, LoadParameters ids) {
|
||||
post(ctx, node, "/process/load", ids).blockingSubscribe();
|
||||
public void loadProcessedData(int node, List<FileStorageId> ids) {
|
||||
stubPool.apiForNode(node).loadProcessedData(
|
||||
RpcFileStorageIds.newBuilder()
|
||||
.addAllFileStorageIds(ids.stream().map(FileStorageId::id).toList())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void calculateAdjacencies(Context ctx, int node) {
|
||||
post(ctx, node, "/process/adjacency-calculation", "").blockingSubscribe();
|
||||
public void calculateAdjacencies(int node) {
|
||||
stubPool.apiForNode(node).calculateAdjacencies(Empty.getDefaultInstance());
|
||||
}
|
||||
|
||||
public void sideloadEncyclopedia(Context ctx, int node, Path sourcePath, String baseUrl) {
|
||||
post(ctx, node,
|
||||
"/sideload/encyclopedia?path="+ URLEncoder.encode(sourcePath.toString(), StandardCharsets.UTF_8) + "&baseUrl=" + URLEncoder.encode(baseUrl, StandardCharsets.UTF_8),
|
||||
"").blockingSubscribe();
|
||||
public void sideloadEncyclopedia(int node, Path sourcePath, String baseUrl) {
|
||||
stubPool.apiForNode(node).sideloadEncyclopedia(
|
||||
RpcSideloadEncyclopedia.newBuilder()
|
||||
.setBaseUrl(baseUrl)
|
||||
.setSourcePath(sourcePath.toString())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void sideloadDirtree(Context ctx, int node, Path sourcePath) {
|
||||
post(ctx, node,
|
||||
"/sideload/dirtree?path="+ URLEncoder.encode(sourcePath.toString(), StandardCharsets.UTF_8),
|
||||
"").blockingSubscribe();
|
||||
public void sideloadDirtree(int node, Path sourcePath) {
|
||||
stubPool.apiForNode(node).sideloadDirtree(
|
||||
RpcSideloadDirtree.newBuilder()
|
||||
.setSourcePath(sourcePath.toString())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void sideloadWarc(Context ctx, int node, Path sourcePath) {
|
||||
post(ctx, node,
|
||||
"/sideload/warc?path="+ URLEncoder.encode(sourcePath.toString(), StandardCharsets.UTF_8),
|
||||
"").blockingSubscribe();
|
||||
public void sideloadWarc(int node, Path sourcePath) {
|
||||
stubPool.apiForNode(node).sideloadWarc(
|
||||
RpcSideloadWarc.newBuilder()
|
||||
.setSourcePath(sourcePath.toString())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void sideloadStackexchange(Context ctx, int node, Path sourcePath) {
|
||||
post(ctx, node,
|
||||
"/sideload/stackexchange?path="+URLEncoder.encode(sourcePath.toString(), StandardCharsets.UTF_8),
|
||||
"").blockingSubscribe();
|
||||
public void sideloadStackexchange(int node, Path sourcePath) {
|
||||
stubPool.apiForNode(node).sideloadStackexchange(
|
||||
RpcSideloadStackexchange.newBuilder()
|
||||
.setSourcePath(sourcePath.toString())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void createCrawlSpecFromDownload(Context context, int node, String description, String url) {
|
||||
post(context, node, "/process/crawl-spec/from-download?description="+URLEncoder.encode(description, StandardCharsets.UTF_8)+"&url="+URLEncoder.encode(url, StandardCharsets.UTF_8), "")
|
||||
.blockingSubscribe();
|
||||
public void createCrawlSpecFromDownload(int node, String description, String url) {
|
||||
stubPool.apiForNode(node).createCrawlSpecFromDownload(
|
||||
RpcCrawlSpecFromDownload.newBuilder()
|
||||
.setDescription(description)
|
||||
.setUrl(url)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void exportAtags(Context ctx, int node, FileStorageId fid) {
|
||||
post(ctx, node, "/export/atags?fid="+fid, "").blockingSubscribe();
|
||||
public void exportAtags(int node, FileStorageId fid) {
|
||||
stubPool.apiForNode(node).exportAtags(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
public void exportSampleData(Context ctx, int node, FileStorageId fid, int size, String name) {
|
||||
post(ctx, node, "/export/sample-data?fid="+fid+"&size="+size+"&name="+URLEncoder.encode(name, StandardCharsets.UTF_8), "").blockingSubscribe();
|
||||
public void exportSampleData(int node, FileStorageId fid, int size, String name) {
|
||||
stubPool.apiForNode(node).exportSampleData(
|
||||
RpcExportSampleData.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.setSize(size)
|
||||
.setName(name)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void exportRssFeeds(Context ctx, int node, FileStorageId fid) {
|
||||
post(ctx, node, "/export/feeds?fid="+fid, "").blockingSubscribe();
|
||||
public void exportRssFeeds(int node, FileStorageId fid) {
|
||||
stubPool.apiForNode(node).exportRssFeeds(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
public void exportTermFrequencies(Context ctx, int node, FileStorageId fid) {
|
||||
post(ctx, node, "/export/termfreq?fid="+fid, "").blockingSubscribe();
|
||||
public void exportTermFrequencies(int node, FileStorageId fid) {
|
||||
stubPool.apiForNode(node).exportTermFrequencies(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void downloadSampleData(Context ctx, int node, String sampleSet) {
|
||||
post(ctx, node, "/action/download-sample-data?set="+URLEncoder.encode(sampleSet, StandardCharsets.UTF_8), "").blockingSubscribe();
|
||||
public void downloadSampleData(int node, String sampleSet) {
|
||||
stubPool.apiForNode(node).downloadSampleData(
|
||||
RpcDownloadSampleData.newBuilder()
|
||||
.setSampleSet(sampleSet)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void exportData(Context ctx, int node) {
|
||||
post(ctx, node, "/export/data", "").blockingSubscribe();
|
||||
public void exportData(int node) {
|
||||
stubPool.apiForNode(node).exportData(Empty.getDefaultInstance());
|
||||
}
|
||||
|
||||
public void restoreBackup(Context context, int node, FileStorageId fid) {
|
||||
post(context, node, "/backup/" + fid + "/restore", "").blockingSubscribe();
|
||||
public void restoreBackup(int node, FileStorageId fid) {
|
||||
stubPool.apiForNode(node).restoreBackup(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public ActorRunStates getActorStates(Context context, int node) {
|
||||
public ActorRunStates getActorStates(int node) {
|
||||
try {
|
||||
return get(context, node, "/actor", ActorRunStates.class).blockingFirst();
|
||||
var rs = stubPool.apiForNode(node).getActorStates(Empty.getDefaultInstance());
|
||||
var states = rs.getActorRunStatesList().stream()
|
||||
.map(r -> new ActorRunState(
|
||||
r.getActorName(),
|
||||
r.getState(),
|
||||
r.getActorDescription(),
|
||||
r.getStateDescription(),
|
||||
r.getTerminal(),
|
||||
r.getCanStart())
|
||||
)
|
||||
.toList();
|
||||
|
||||
return new ActorRunStates(node, states);
|
||||
}
|
||||
catch (RouteNotConfiguredException ex) {
|
||||
// node is down, return dummy data
|
||||
return new ActorRunStates(node, new ArrayList<>());
|
||||
catch (Exception ex) {
|
||||
logger.warn("Failed to get actor states", ex);
|
||||
|
||||
// Return an empty list of states to avoid breaking the UI when a node is down
|
||||
return new ActorRunStates(node, List.of());
|
||||
}
|
||||
}
|
||||
|
||||
public UploadDirContents listSideloadDir(Context context, int node) {
|
||||
public UploadDirContents listSideloadDir(int node) {
|
||||
try {
|
||||
return get(context, node, "/sideload/", UploadDirContents.class).blockingFirst();
|
||||
var rs = stubPool.apiForNode(node).listSideloadDir(Empty.getDefaultInstance());
|
||||
var items = rs.getEntriesList().stream()
|
||||
.map(i -> new UploadDirItem(i.getName(), i.getLastModifiedTime(), i.getIsDirectory(), i.getSize()))
|
||||
.toList();
|
||||
return new UploadDirContents(rs.getPath(), items);
|
||||
}
|
||||
catch (RouteNotConfiguredException ex) {
|
||||
// node is down, return dummy data
|
||||
return new UploadDirContents("/", new ArrayList<>());
|
||||
catch (Exception ex) {
|
||||
logger.warn("Failed to list sideload dir", ex);
|
||||
|
||||
// Return an empty list of items to avoid breaking the UI when a node is down
|
||||
return new UploadDirContents("", List.of());
|
||||
}
|
||||
}
|
||||
|
||||
public FileStorageContent listFileStorage(Context context, int node, FileStorageId fileId) {
|
||||
public FileStorageContent listFileStorage(int node, FileStorageId fileId) {
|
||||
try {
|
||||
return get(context, node, "/storage/"+fileId.id(), FileStorageContent.class).blockingFirst();
|
||||
var rs = stubPool.apiForNode(node).listFileStorage(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fileId.id())
|
||||
.build()
|
||||
);
|
||||
|
||||
return new FileStorageContent(rs.getEntriesList().stream()
|
||||
.map(e -> new FileStorageFile(e.getName(), e.getSize(), e.getLastModifiedTime()))
|
||||
.toList());
|
||||
}
|
||||
catch (RouteNotConfiguredException ex) {
|
||||
// node is down, return dummy data
|
||||
return new FileStorageContent(new ArrayList<>());
|
||||
catch (Exception ex) {
|
||||
logger.warn("Failed to list file storage", ex);
|
||||
|
||||
// Return an empty list of items to avoid breaking the UI when a node is down
|
||||
return new FileStorageContent(List.of());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
package nu.marginalia.executor.client;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import jakarta.inject.Singleton;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
import com.google.gson.Gson;
|
||||
import nu.marginalia.mq.MqMessage;
|
||||
|
@ -1,10 +0,0 @@
|
||||
package nu.marginalia.executor.model.load;
|
||||
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public record LoadParameters(
|
||||
List<FileStorageId> ids
|
||||
) {
|
||||
}
|
@ -1,29 +1,9 @@
|
||||
package nu.marginalia.executor.upload;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
public record UploadDirItem (
|
||||
String name,
|
||||
String lastModifiedTime,
|
||||
boolean isDirectory,
|
||||
long size
|
||||
) {
|
||||
|
||||
@SneakyThrows
|
||||
public static UploadDirItem fromPath(Path path) {
|
||||
boolean isDir = Files.isDirectory(path);
|
||||
long size = isDir ? 0 : Files.size(path);
|
||||
var mtime = Files.getLastModifiedTime(path);
|
||||
|
||||
|
||||
return new UploadDirItem(path.toString(),
|
||||
LocalDateTime.ofInstant(mtime.toInstant(), ZoneId.systemDefault()).format(DateTimeFormatter.ISO_DATE_TIME), isDir, size);
|
||||
}
|
||||
|
||||
}
|
||||
|
105
code/api/executor-api/src/main/protobuf/executor-api.proto
Normal file
105
code/api/executor-api/src/main/protobuf/executor-api.proto
Normal file
@ -0,0 +1,105 @@
|
||||
syntax="proto3";
|
||||
package actorapi;
|
||||
|
||||
option java_package="nu.marginalia.executor.api";
|
||||
option java_multiple_files=true;
|
||||
|
||||
service ExecutorApi {
|
||||
rpc startFsm(RpcFsmName) returns (Empty) {}
|
||||
rpc stopFsm(RpcFsmName) returns (Empty) {}
|
||||
|
||||
rpc stopProcess(RpcProcessId) returns (Empty) {}
|
||||
|
||||
rpc triggerCrawl(RpcFileStorageId) returns (Empty) {}
|
||||
rpc triggerRecrawl(RpcFileStorageId) returns (Empty) {}
|
||||
rpc triggerConvert(RpcFileStorageId) returns (Empty) {}
|
||||
rpc triggerConvertAndLoad(RpcFileStorageId) returns (Empty) {}
|
||||
rpc loadProcessedData(RpcFileStorageIds) returns (Empty) {}
|
||||
rpc calculateAdjacencies(Empty) returns (Empty) {}
|
||||
|
||||
rpc sideloadEncyclopedia(RpcSideloadEncyclopedia) returns (Empty) {}
|
||||
rpc sideloadDirtree(RpcSideloadDirtree) returns (Empty) {}
|
||||
rpc sideloadWarc(RpcSideloadWarc) returns (Empty) {}
|
||||
rpc sideloadStackexchange(RpcSideloadStackexchange) returns (Empty) {}
|
||||
|
||||
rpc createCrawlSpecFromDownload(RpcCrawlSpecFromDownload) returns (Empty) {}
|
||||
rpc exportAtags(RpcFileStorageId) returns (Empty) {}
|
||||
rpc exportSampleData(RpcExportSampleData) returns (Empty) {}
|
||||
rpc exportRssFeeds(RpcFileStorageId) returns (Empty) {}
|
||||
rpc exportTermFrequencies(RpcFileStorageId) returns (Empty) {}
|
||||
rpc downloadSampleData(RpcDownloadSampleData) returns (Empty) {}
|
||||
rpc exportData(Empty) returns (Empty) {}
|
||||
rpc restoreBackup(RpcFileStorageId) returns (Empty) {}
|
||||
rpc getActorStates(Empty) returns (RpcActorRunStates) {}
|
||||
rpc listSideloadDir(Empty) returns (RpcUploadDirContents) {}
|
||||
rpc listFileStorage(RpcFileStorageId) returns (RpcFileStorageContent) {}
|
||||
}
|
||||
|
||||
message Empty {}
|
||||
message RpcFsmName {
|
||||
string actorName = 1;
|
||||
}
|
||||
message RpcProcessId {
|
||||
string processId = 1;
|
||||
}
|
||||
message RpcFileStorageId {
|
||||
int64 fileStorageId = 1;
|
||||
}
|
||||
message RpcFileStorageIds {
|
||||
repeated int64 fileStorageIds = 1;
|
||||
}
|
||||
message RpcSideloadEncyclopedia {
|
||||
string sourcePath = 1;
|
||||
string baseUrl = 2;
|
||||
}
|
||||
message RpcSideloadDirtree {
|
||||
string sourcePath = 1;
|
||||
}
|
||||
message RpcSideloadWarc {
|
||||
string sourcePath = 1;
|
||||
}
|
||||
message RpcSideloadStackexchange {
|
||||
string sourcePath = 1;
|
||||
}
|
||||
message RpcCrawlSpecFromDownload {
|
||||
string description = 1;
|
||||
string url = 2;
|
||||
}
|
||||
message RpcExportSampleData {
|
||||
int64 fileStorageId = 1;
|
||||
int32 size = 2;
|
||||
string name = 3;
|
||||
}
|
||||
message RpcDownloadSampleData {
|
||||
string sampleSet = 1;
|
||||
}
|
||||
message RpcActorRunStates {
|
||||
int32 node = 1;
|
||||
repeated RpcActorRunState actorRunStates = 2;
|
||||
}
|
||||
message RpcActorRunState {
|
||||
string actorName = 1;
|
||||
string state = 2;
|
||||
string actorDescription = 3;
|
||||
string stateDescription = 4;
|
||||
bool terminal = 5;
|
||||
bool canStart = 6;
|
||||
}
|
||||
message RpcUploadDirContents {
|
||||
string path = 1;
|
||||
repeated RpcUploadDirEntry entries = 2;
|
||||
}
|
||||
message RpcUploadDirEntry {
|
||||
string name = 1;
|
||||
string lastModifiedTime = 2;
|
||||
bool isDirectory = 3;
|
||||
int64 size = 4;
|
||||
}
|
||||
message RpcFileStorageContent {
|
||||
repeated RpcFileStorageEntry entries = 1;
|
||||
}
|
||||
message RpcFileStorageEntry {
|
||||
string name = 1;
|
||||
int64 size = 2;
|
||||
string lastModifiedTime = 3;
|
||||
}
|
@ -17,6 +17,9 @@ sourceSets {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
apply from: "$rootProject.projectDir/protobuf.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:config')
|
||||
@ -42,30 +45,5 @@ dependencies {
|
||||
testImplementation libs.mockito
|
||||
}
|
||||
|
||||
protobuf {
|
||||
protoc {
|
||||
if (osdetector.os == "osx") {
|
||||
artifact = "com.google.protobuf:protoc:3.0.2:osx-x86_64"
|
||||
} else {
|
||||
artifact = "com.google.protobuf:protoc:3.0.2"
|
||||
}
|
||||
}
|
||||
plugins {
|
||||
grpc {
|
||||
if (osdetector.os == "osx") {
|
||||
artifact = "io.grpc:protoc-gen-grpc-java:1.1.2:osx-x86_64"
|
||||
} else {
|
||||
artifact = "io.grpc:protoc-gen-grpc-java:1.1.2"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
generateProtoTasks {
|
||||
all().each { task ->
|
||||
task.plugins {
|
||||
grpc {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -20,7 +20,6 @@ import nu.marginalia.service.id.ServiceId;
|
||||
import java.util.List;
|
||||
|
||||
import javax.annotation.CheckReturnValue;
|
||||
import java.util.ServiceConfigurationError;
|
||||
import java.util.UUID;
|
||||
|
||||
@Singleton
|
||||
@ -39,7 +38,7 @@ public class IndexClient extends AbstractDynamicClient {
|
||||
super(descriptors.forId(ServiceId.Index), GsonFactory::get);
|
||||
this.messageQueueFactory = messageQueueFactory;
|
||||
|
||||
String inboxName = ServiceId.Index.name;
|
||||
String inboxName = ServiceId.Index.serviceName;
|
||||
String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString());
|
||||
outbox = messageQueueFactory.createOutbox(inboxName, nodeId, outboxName, nodeId, UUID.randomUUID());
|
||||
setTimeout(30);
|
||||
|
@ -4,7 +4,7 @@ public record FileStorageId(long id) {
|
||||
public static FileStorageId parse(String str) {
|
||||
return new FileStorageId(Long.parseLong(str));
|
||||
}
|
||||
public static FileStorageId of(int storageId) {
|
||||
public static FileStorageId of(long storageId) {
|
||||
return new FileStorageId(storageId);
|
||||
}
|
||||
|
||||
|
@ -24,6 +24,7 @@ dependencies {
|
||||
implementation libs.bundles.httpcomponents
|
||||
|
||||
implementation libs.bundles.gson
|
||||
implementation libs.bundles.grpc
|
||||
implementation libs.protobuf
|
||||
|
||||
implementation libs.bundles.prometheus
|
||||
|
@ -114,7 +114,7 @@ public class ServiceMonitors {
|
||||
|
||||
public boolean isServiceUp(ServiceId serviceId, int node) {
|
||||
synchronized (runningServices) {
|
||||
return runningServices.contains(new ServiceNode(serviceId.name, node));
|
||||
return runningServices.contains(new ServiceNode(serviceId.serviceName, node));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,8 +1,8 @@
|
||||
package nu.marginalia.query;
|
||||
package nu.marginalia.client.grpc;
|
||||
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import nu.marginalia.query.svc.NodeConfigurationWatcher;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -10,26 +10,34 @@ import java.util.concurrent.*;
|
||||
import java.util.function.Function;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public abstract class QueryGrpcStubPool<STUB> {
|
||||
/** A pool of gRPC stubs for a service, with a separate stub for each node.
|
||||
* Manages broadcast-style request. */
|
||||
public abstract class GrpcStubPool<STUB> {
|
||||
public GrpcStubPool(String serviceName) {
|
||||
|
||||
this.serviceName = serviceName;
|
||||
}
|
||||
|
||||
protected record ServiceAndNode(String service, int node) {
|
||||
public String getHostName() {
|
||||
return service+"-"+node;
|
||||
}
|
||||
}
|
||||
|
||||
private final NodeConfigurationWatcher nodeConfigurationWatcher;
|
||||
private final Map<ServiceAndNode, ManagedChannel> channels = new ConcurrentHashMap<>();
|
||||
private final Map<ServiceAndNode, STUB> actorRpcApis = new ConcurrentHashMap<>();
|
||||
private final Map<ServiceAndNode, STUB> apis = new ConcurrentHashMap<>();
|
||||
private final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor();
|
||||
|
||||
QueryGrpcStubPool(NodeConfigurationWatcher nodeConfigurationWatcher) {
|
||||
this.nodeConfigurationWatcher = nodeConfigurationWatcher;
|
||||
private final String serviceName;
|
||||
|
||||
public GrpcStubPool(ServiceId serviceId) {
|
||||
this.serviceName = serviceId.serviceName;
|
||||
}
|
||||
|
||||
/** Get an API stub for the given node */
|
||||
public STUB indexApi(int node) {
|
||||
var san = new ServiceAndNode("index-service", node);
|
||||
return actorRpcApis.computeIfAbsent(san, n ->
|
||||
public STUB apiForNode(int node) {
|
||||
var san = new ServiceAndNode(serviceName, node);
|
||||
return apis.computeIfAbsent(san, n ->
|
||||
createStub(channels.computeIfAbsent(san, this::createChannel))
|
||||
);
|
||||
}
|
||||
@ -41,8 +49,8 @@ public abstract class QueryGrpcStubPool<STUB> {
|
||||
/** Invoke a function on each node, returning a list of futures in a terminal state, as per
|
||||
* ExecutorService$invokeAll */
|
||||
public <T> List<Future<T>> invokeAll(Function<STUB, Callable<T>> callF) throws InterruptedException {
|
||||
List<Callable<T>> calls = nodeConfigurationWatcher.getQueryNodes().stream()
|
||||
.map(id -> callF.apply(indexApi(id)))
|
||||
List<Callable<T>> calls = getEligibleNodes().stream()
|
||||
.map(id -> callF.apply(apiForNode(id)))
|
||||
.toList();
|
||||
|
||||
return virtualExecutorService.invokeAll(calls);
|
||||
@ -50,8 +58,8 @@ public abstract class QueryGrpcStubPool<STUB> {
|
||||
|
||||
/** Invoke a function on each node, returning a stream of results */
|
||||
public <T> Stream<T> callEachSequential(Function<STUB, T> call) {
|
||||
return nodeConfigurationWatcher.getQueryNodes().stream()
|
||||
.map(id -> call.apply(indexApi(id)));
|
||||
return getEligibleNodes().stream()
|
||||
.map(id -> call.apply(apiForNode(id)));
|
||||
}
|
||||
|
||||
|
||||
@ -61,4 +69,7 @@ public abstract class QueryGrpcStubPool<STUB> {
|
||||
*/
|
||||
public abstract STUB createStub(ManagedChannel channel);
|
||||
|
||||
/** Get the list of nodes that are eligible for broadcast-style requests */
|
||||
public abstract List<Integer> getEligibleNodes();
|
||||
|
||||
}
|
@ -8,7 +8,7 @@ public class ServiceDescriptor {
|
||||
|
||||
public ServiceDescriptor(ServiceId id) {
|
||||
this.id = id;
|
||||
this.name = id.name;
|
||||
this.name = id.serviceName;
|
||||
}
|
||||
|
||||
public ServiceDescriptor(ServiceId id, String host) {
|
||||
|
@ -14,18 +14,19 @@ public enum ServiceId {
|
||||
Dating("dating-service"),
|
||||
Explorer("explorer-service");
|
||||
|
||||
public final String name;
|
||||
ServiceId(String name) {
|
||||
this.name = name;
|
||||
public final String serviceName;
|
||||
|
||||
ServiceId(String serviceName) {
|
||||
this.serviceName = serviceName;
|
||||
}
|
||||
|
||||
public String withNode(int node) {
|
||||
return name + ":" + node;
|
||||
return serviceName + ":" + node;
|
||||
}
|
||||
|
||||
public static ServiceId byName(String name) {
|
||||
for (ServiceId id : values()) {
|
||||
if (id.name.equals(name)) {
|
||||
if (id.serviceName.equals(name)) {
|
||||
return id;
|
||||
}
|
||||
}
|
||||
|
@ -51,10 +51,10 @@ public abstract class MainClass {
|
||||
protected static void init(ServiceId id, String... args) {
|
||||
System.setProperty("log4j2.isThreadContextMapInheritable", "true");
|
||||
System.setProperty("isThreadContextMapInheritable", "true");
|
||||
System.setProperty("service-name", id.name);
|
||||
System.setProperty("service-name", id.serviceName);
|
||||
|
||||
ConfigLoader.loadConfig(
|
||||
ConfigLoader.getConfigPath(id.name)
|
||||
ConfigLoader.getConfigPath(id.serviceName)
|
||||
);
|
||||
|
||||
initJdbc();
|
||||
|
@ -22,6 +22,6 @@ public record ServiceConfiguration(ServiceId serviceId,
|
||||
int metricsPort,
|
||||
UUID instanceUuid) {
|
||||
public String serviceName() {
|
||||
return serviceId.name;
|
||||
return serviceId.serviceName;
|
||||
}
|
||||
}
|
||||
|
@ -1,7 +1,7 @@
|
||||
package nu.marginalia.service.server;
|
||||
|
||||
import com.google.inject.name.Named;
|
||||
import jakarta.inject.Inject;
|
||||
import com.google.inject.Inject;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
|
@ -1,6 +1,6 @@
|
||||
package nu.marginalia.control;
|
||||
|
||||
import jakarta.inject.Inject;
|
||||
import com.google.inject.Inject;
|
||||
import spark.ResponseTransformer;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -2,11 +2,9 @@ package nu.marginalia.control.node.svc;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.client.Context;
|
||||
import nu.marginalia.control.ControlValidationError;
|
||||
import nu.marginalia.control.RedirectControl;
|
||||
import nu.marginalia.executor.client.ExecutorClient;
|
||||
import nu.marginalia.executor.model.load.LoadParameters;
|
||||
import nu.marginalia.index.client.IndexClient;
|
||||
import nu.marginalia.service.control.ServiceEventLog;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
@ -106,7 +104,7 @@ public class ControlNodeActionsService {
|
||||
if (!Set.of("sample-s", "sample-m", "sample-l", "sample-xl").contains(set))
|
||||
throw new ControlValidationError("Invalid sample specified", "A valid sample data set must be specified", "..");
|
||||
|
||||
executorClient.downloadSampleData(Context.fromRequest(request), Integer.parseInt(request.params("node")), set);
|
||||
executorClient.downloadSampleData(Integer.parseInt(request.params("node")), set);
|
||||
|
||||
logger.info("Downloading sample data set {}", set);
|
||||
|
||||
@ -126,7 +124,7 @@ public class ControlNodeActionsService {
|
||||
|
||||
eventLog.logEvent("USER-ACTION", "SIDELOAD ENCYCLOPEDIA " + nodeId);
|
||||
|
||||
executorClient.sideloadEncyclopedia(Context.fromRequest(request), nodeId, sourcePath, baseUrl);
|
||||
executorClient.sideloadEncyclopedia(nodeId, sourcePath, baseUrl);
|
||||
|
||||
return "";
|
||||
}
|
||||
@ -139,7 +137,7 @@ public class ControlNodeActionsService {
|
||||
|
||||
eventLog.logEvent("USER-ACTION", "SIDELOAD DIRTREE " + nodeId);
|
||||
|
||||
executorClient.sideloadDirtree(Context.fromRequest(request), nodeId, sourcePath);
|
||||
executorClient.sideloadDirtree(nodeId, sourcePath);
|
||||
|
||||
return "";
|
||||
}
|
||||
@ -151,7 +149,7 @@ public class ControlNodeActionsService {
|
||||
|
||||
eventLog.logEvent("USER-ACTION", "SIDELOAD WARC " + nodeId);
|
||||
|
||||
executorClient.sideloadWarc(Context.fromRequest(request), nodeId, sourcePath);
|
||||
executorClient.sideloadWarc(nodeId, sourcePath);
|
||||
|
||||
return "";
|
||||
}
|
||||
@ -166,7 +164,8 @@ public class ControlNodeActionsService {
|
||||
|
||||
eventLog.logEvent("USER-ACTION", "SIDELOAD STACKEXCHANGE " + nodeId);
|
||||
|
||||
executorClient.sideloadStackexchange(Context.fromRequest(request), nodeId, sourcePath);
|
||||
executorClient.sideloadStackexchange(nodeId, sourcePath);
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
@ -184,7 +183,6 @@ public class ControlNodeActionsService {
|
||||
changeActiveStorage(nodeId, FileStorageType.CRAWL_DATA, toCrawl);
|
||||
|
||||
executorClient.triggerRecrawl(
|
||||
Context.fromRequest(request),
|
||||
nodeId,
|
||||
toCrawl
|
||||
);
|
||||
@ -199,11 +197,7 @@ public class ControlNodeActionsService {
|
||||
|
||||
changeActiveStorage(nodeId, FileStorageType.CRAWL_SPEC, toCrawl);
|
||||
|
||||
executorClient.triggerCrawl(
|
||||
Context.fromRequest(request),
|
||||
nodeId,
|
||||
toCrawl
|
||||
);
|
||||
executorClient.triggerCrawl(nodeId, toCrawl);
|
||||
|
||||
return "";
|
||||
}
|
||||
@ -216,14 +210,10 @@ public class ControlNodeActionsService {
|
||||
changeActiveStorage(nodeId, FileStorageType.PROCESSED_DATA, toProcess);
|
||||
|
||||
if (isAutoload) {
|
||||
executorClient.triggerConvertAndLoad(Context.fromRequest(request),
|
||||
nodeId,
|
||||
toProcess);
|
||||
executorClient.triggerConvertAndLoad(nodeId, toProcess);
|
||||
}
|
||||
else {
|
||||
executorClient.triggerConvert(Context.fromRequest(request),
|
||||
nodeId,
|
||||
toProcess);
|
||||
executorClient.triggerConvert(nodeId, toProcess);
|
||||
}
|
||||
|
||||
return "";
|
||||
@ -241,10 +231,7 @@ public class ControlNodeActionsService {
|
||||
|
||||
changeActiveStorage(nodeId, FileStorageType.PROCESSED_DATA, ids.toArray(new FileStorageId[0]));
|
||||
|
||||
executorClient.loadProcessedData(Context.fromRequest(request),
|
||||
nodeId,
|
||||
new LoadParameters(ids)
|
||||
);
|
||||
executorClient.loadProcessedData(nodeId, ids);
|
||||
|
||||
return "";
|
||||
}
|
||||
@ -254,7 +241,7 @@ public class ControlNodeActionsService {
|
||||
|
||||
var toLoad = parseSourceFileStorageId(request.queryParams("source"));
|
||||
|
||||
executorClient.restoreBackup(Context.fromRequest(request), nodeId, toLoad);
|
||||
executorClient.restoreBackup(nodeId, toLoad);
|
||||
|
||||
return "";
|
||||
}
|
||||
@ -286,13 +273,13 @@ public class ControlNodeActionsService {
|
||||
throw new ControlValidationError("No url specified", "A url must be specified", "..");
|
||||
}
|
||||
|
||||
executorClient.createCrawlSpecFromDownload(Context.fromRequest(request), nodeId, description, url);
|
||||
executorClient.createCrawlSpecFromDownload(nodeId, description, url);
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
private Object exportDbData(Request req, Response rsp) {
|
||||
executorClient.exportData(Context.fromRequest(req), Integer.parseInt(req.params("id")));
|
||||
executorClient.exportData(Integer.parseInt(req.params("id")));
|
||||
|
||||
return "";
|
||||
}
|
||||
@ -302,9 +289,9 @@ public class ControlNodeActionsService {
|
||||
FileStorageId source = parseSourceFileStorageId(req.queryParams("source"));
|
||||
|
||||
switch (exportType) {
|
||||
case "atags" -> executorClient.exportAtags(Context.fromRequest(req), Integer.parseInt(req.params("id")), source);
|
||||
case "rss" -> executorClient.exportRssFeeds(Context.fromRequest(req), Integer.parseInt(req.params("id")), source);
|
||||
case "termFreq" -> executorClient.exportTermFrequencies(Context.fromRequest(req), Integer.parseInt(req.params("id")), source);
|
||||
case "atags" -> executorClient.exportAtags(Integer.parseInt(req.params("id")), source);
|
||||
case "rss" -> executorClient.exportRssFeeds(Integer.parseInt(req.params("id")), source);
|
||||
case "termFreq" -> executorClient.exportTermFrequencies(Integer.parseInt(req.params("id")), source);
|
||||
default -> throw new ControlValidationError("No export type specified", "An export type must be specified", "..");
|
||||
}
|
||||
|
||||
@ -316,7 +303,7 @@ public class ControlNodeActionsService {
|
||||
int size = Integer.parseInt(req.queryParams("size"));
|
||||
String name = req.queryParams("name");
|
||||
|
||||
executorClient.exportSampleData(Context.fromRequest(req), Integer.parseInt(req.params("id")), source, size, name);
|
||||
executorClient.exportSampleData(Integer.parseInt(req.params("id")), source, size, name);
|
||||
|
||||
return "";
|
||||
}
|
||||
|
@ -107,13 +107,13 @@ public class ControlNodeService {
|
||||
}
|
||||
|
||||
public Object startFsm(Request req, Response rsp) throws Exception {
|
||||
executorClient.startFsm(Context.fromRequest(req), Integer.parseInt(req.params("id")), req.params("fsm").toUpperCase());
|
||||
executorClient.startFsm(Integer.parseInt(req.params("id")), req.params("fsm").toUpperCase());
|
||||
|
||||
return redirectToOverview(req);
|
||||
}
|
||||
|
||||
public Object stopFsm(Request req, Response rsp) throws Exception {
|
||||
executorClient.stopFsm(Context.fromRequest(req), Integer.parseInt(req.params("id")), req.params("fsm").toUpperCase());
|
||||
executorClient.stopFsm(Integer.parseInt(req.params("id")), req.params("fsm").toUpperCase());
|
||||
|
||||
return redirectToOverview(req);
|
||||
}
|
||||
@ -132,7 +132,7 @@ public class ControlNodeService {
|
||||
int nodeId = Integer.parseInt(request.params("id"));
|
||||
String processBase = request.params("processBase");
|
||||
|
||||
executorClient.stopProcess(Context.fromRequest(request), nodeId, processBase);
|
||||
executorClient.stopProcess(nodeId, processBase);
|
||||
|
||||
return "";
|
||||
}
|
||||
@ -153,7 +153,7 @@ public class ControlNodeService {
|
||||
return Map.of(
|
||||
"tab", Map.of("actors", true),
|
||||
"node", nodeConfigurationService.get(nodeId),
|
||||
"actors", executorClient.getActorStates(Context.fromRequest(request), nodeId).states()
|
||||
"actors", executorClient.getActorStates(nodeId).states()
|
||||
);
|
||||
}
|
||||
|
||||
@ -164,7 +164,7 @@ public class ControlNodeService {
|
||||
"tab", Map.of("actions", true),
|
||||
"node", nodeConfigurationService.get(nodeId),
|
||||
"view", Map.of(request.queryParams("view"), true),
|
||||
"uploadDirContents", executorClient.listSideloadDir(Context.fromRequest(request), nodeId),
|
||||
"uploadDirContents", executorClient.listSideloadDir(nodeId),
|
||||
"allBackups",
|
||||
fileStorageService.getEachFileStorage(nodeId, FileStorageType.BACKUP),
|
||||
"allCrawlData",
|
||||
@ -279,9 +279,9 @@ public class ControlNodeService {
|
||||
int nodeId = Integer.parseInt(request.params("id"));
|
||||
var config = nodeConfigurationService.get(nodeId);
|
||||
|
||||
var actors = executorClient.getActorStates(Context.fromRequest(request), nodeId).states();
|
||||
|
||||
actors.removeIf(actor -> actor.state().equals("MONITOR"));
|
||||
var actors = executorClient.getActorStates(nodeId).states()
|
||||
.stream().filter(actor -> !actor.state().equals("MONITOR"))
|
||||
.toList();
|
||||
|
||||
return Map.of(
|
||||
"node", nodeConfigurationService.get(nodeId),
|
||||
@ -308,7 +308,7 @@ public class ControlNodeService {
|
||||
}
|
||||
|
||||
private List<EventLogEntry> getEvents(int nodeId) {
|
||||
List<String> services = List.of(ServiceId.Index.name+":"+nodeId, ServiceId.Executor.name+":"+nodeId);
|
||||
List<String> services = List.of(ServiceId.Index.serviceName +":"+nodeId, ServiceId.Executor.serviceName +":"+nodeId);
|
||||
List<EventLogEntry> events = new ArrayList<>(20);
|
||||
for (var service :services) {
|
||||
events.addAll(eventLogService.getLastEntriesForService(service, Long.MAX_VALUE, 10));
|
||||
@ -399,7 +399,7 @@ public class ControlNodeService {
|
||||
|
||||
List<FileStorageFileModel> files = new ArrayList<>();
|
||||
|
||||
for (var execFile : executorClient.listFileStorage(context, node, fileId).files()) {
|
||||
for (var execFile : executorClient.listFileStorage(node, fileId).files()) {
|
||||
files.add(new FileStorageFileModel(
|
||||
execFile.name(),
|
||||
execFile.modTime(),
|
||||
|
@ -2,7 +2,6 @@ package nu.marginalia.control.sys.svc;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.client.Context;
|
||||
import nu.marginalia.control.ControlRendererFactory;
|
||||
import nu.marginalia.control.Redirects;
|
||||
import nu.marginalia.control.actor.ControlActor;
|
||||
@ -56,7 +55,7 @@ public class ControlSysActionsService {
|
||||
* and lacks a proper internal API
|
||||
*/
|
||||
private MqOutbox createApiOutbox(MessageQueueFactory mqFactory) {
|
||||
String inboxName = ServiceId.Api.name + ":" + "0";
|
||||
String inboxName = ServiceId.Api.serviceName + ":" + "0";
|
||||
String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString());
|
||||
return mqFactory.createOutbox(inboxName, 0, outboxName, 0, UUID.randomUUID());
|
||||
}
|
||||
@ -119,7 +118,7 @@ public class ControlSysActionsService {
|
||||
// This is technically not a partitioned operation, but we execute it at node 1
|
||||
// and let the effects be global :-)
|
||||
|
||||
executorClient.calculateAdjacencies(Context.fromRequest(request), 1);
|
||||
executorClient.calculateAdjacencies(1);
|
||||
|
||||
return "";
|
||||
}
|
||||
|
@ -51,6 +51,7 @@ dependencies {
|
||||
implementation libs.bundles.slf4j
|
||||
|
||||
implementation libs.spark
|
||||
implementation libs.bundles.grpc
|
||||
implementation libs.gson
|
||||
implementation libs.prometheus
|
||||
implementation libs.notnull
|
||||
|
@ -2,17 +2,21 @@ package nu.marginalia.actor;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.proc.ProcessLivenessMonitorActor;
|
||||
import nu.marginalia.actor.state.ActorStateInstance;
|
||||
import nu.marginalia.executor.api.RpcActorRunState;
|
||||
import nu.marginalia.executor.api.RpcActorRunStates;
|
||||
import nu.marginalia.executor.api.RpcFsmName;
|
||||
import nu.marginalia.executor.api.RpcProcessId;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import spark.Request;
|
||||
import spark.Response;
|
||||
import spark.Spark;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
@Singleton
|
||||
public class ActorApi {
|
||||
private final ExecutorActorControlService actors;
|
||||
@ -32,33 +36,20 @@ public class ActorApi {
|
||||
this.serviceConfiguration = serviceConfiguration;
|
||||
}
|
||||
|
||||
public Object startActorFromState(Request request, Response response) throws Exception {
|
||||
ExecutorActor actor = translateActor(request.params("id"));
|
||||
String state = request.params("state");
|
||||
|
||||
actors.startFromJSON(actor, state, request.body());
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
public Object startActor(Request request, Response response) throws Exception {
|
||||
ExecutorActor actor = translateActor(request.params("id"));
|
||||
public void startActor(RpcFsmName actorName) throws Exception {
|
||||
ExecutorActor actor = translateActor(actorName.getActorName());
|
||||
|
||||
actors.start(actor);
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
public Object stopActor(Request request, Response response) {
|
||||
ExecutorActor actor = translateActor(request.params("id"));
|
||||
|
||||
public void stopActor(RpcFsmName actorName) {
|
||||
ExecutorActor actor = translateActor(actorName.getActorName());
|
||||
actors.stop(actor);
|
||||
|
||||
return "OK";
|
||||
}
|
||||
|
||||
public Object stopProcess(Request request, Response response) {
|
||||
ProcessService.ProcessId id = ProcessService.translateExternalIdBase(request.params("id"));
|
||||
public Object stopProcess(RpcProcessId processId) {
|
||||
ProcessService.ProcessId id = ProcessService.translateExternalIdBase(processId.getProcessId());
|
||||
|
||||
try {
|
||||
String inbox = id.name().toLowerCase() + ":" + serviceConfiguration.node();
|
||||
@ -84,6 +75,45 @@ public class ActorApi {
|
||||
return "OK";
|
||||
}
|
||||
|
||||
|
||||
public RpcActorRunStates getActorStates() {
|
||||
var items = actors.getActorStates().entrySet().stream().map(e -> {
|
||||
final var stateGraph = actors.getActorDefinition(e.getKey());
|
||||
|
||||
final ActorStateInstance state = e.getValue();
|
||||
final String actorDescription = stateGraph.describe();
|
||||
|
||||
final String machineName = e.getKey().name();
|
||||
final String stateName = state.name();
|
||||
|
||||
final String stateDescription = "";
|
||||
|
||||
final boolean terminal = state.isFinal();
|
||||
final boolean canStart = actors.isDirectlyInitializable(e.getKey()) && terminal;
|
||||
|
||||
return RpcActorRunState
|
||||
.newBuilder()
|
||||
.setActorName(machineName)
|
||||
.setState(stateName)
|
||||
.setActorDescription(actorDescription)
|
||||
.setStateDescription(stateDescription)
|
||||
.setTerminal(terminal)
|
||||
.setCanStart(canStart)
|
||||
.build();
|
||||
|
||||
})
|
||||
.filter(s -> !s.getTerminal() || s.getCanStart())
|
||||
.sorted(Comparator.comparing(RpcActorRunState::getActorName))
|
||||
.toList();
|
||||
|
||||
return RpcActorRunStates.newBuilder()
|
||||
.setNode(serviceConfiguration.node())
|
||||
.addAllActorRunStates(items)
|
||||
.build();
|
||||
|
||||
}
|
||||
|
||||
|
||||
public ExecutorActor translateActor(String name) {
|
||||
try {
|
||||
return ExecutorActor.valueOf(name.toUpperCase());
|
||||
|
@ -1,7 +1,7 @@
|
||||
package nu.marginalia.actor;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import jakarta.inject.Singleton;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.actor.state.ActorStateInstance;
|
||||
import nu.marginalia.actor.state.ActorStep;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
|
@ -0,0 +1,313 @@
|
||||
package nu.marginalia.executor;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import nu.marginalia.actor.ActorApi;
|
||||
import nu.marginalia.executor.api.*;
|
||||
import nu.marginalia.executor.svc.*;
|
||||
|
||||
public class ExecutorGrpcService extends ExecutorApiGrpc.ExecutorApiImplBase {
|
||||
private final ActorApi actorApi;
|
||||
private final ExportService exportService;
|
||||
private final SideloadService sideloadService;
|
||||
private final BackupService backupService;
|
||||
private final TransferService transferService;
|
||||
private final ProcessingService processingService;
|
||||
|
||||
@Inject
|
||||
public ExecutorGrpcService(ActorApi actorApi,
|
||||
ExportService exportService,
|
||||
SideloadService sideloadService,
|
||||
BackupService backupService,
|
||||
TransferService transferService,
|
||||
ProcessingService processingService)
|
||||
{
|
||||
this.actorApi = actorApi;
|
||||
this.exportService = exportService;
|
||||
this.sideloadService = sideloadService;
|
||||
this.backupService = backupService;
|
||||
this.transferService = transferService;
|
||||
this.processingService = processingService;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startFsm(RpcFsmName request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
actorApi.startActor(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopFsm(RpcFsmName request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
actorApi.stopActor(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopProcess(RpcProcessId request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
actorApi.stopProcess(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggerCrawl(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
processingService.startCrawl(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggerRecrawl(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
processingService.startRecrawl(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggerConvert(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
processingService.startConversion(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void triggerConvertAndLoad(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
processingService.startConvertLoad(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void loadProcessedData(RpcFileStorageIds request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
processingService.startLoad(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void calculateAdjacencies(Empty request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
processingService.startAdjacencyCalculation();
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sideloadEncyclopedia(RpcSideloadEncyclopedia request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
sideloadService.sideloadEncyclopedia(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sideloadDirtree(RpcSideloadDirtree request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
sideloadService.sideloadDirtree(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sideloadWarc(RpcSideloadWarc request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
sideloadService.sideloadWarc(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void sideloadStackexchange(RpcSideloadStackexchange request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
sideloadService.sideloadStackexchange(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void createCrawlSpecFromDownload(RpcCrawlSpecFromDownload request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
processingService.createCrawlSpecFromDownload(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exportAtags(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
exportService.exportAtags(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exportSampleData(RpcExportSampleData request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
exportService.exportSampleData(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exportRssFeeds(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
exportService.exportFeeds(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exportTermFrequencies(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
exportService.exportTermFrequencies(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void downloadSampleData(RpcDownloadSampleData request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
sideloadService.downloadSampleData(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void exportData(Empty request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
exportService.exportData();
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void restoreBackup(RpcFileStorageId request, StreamObserver<Empty> responseObserver) {
|
||||
try {
|
||||
backupService.restore(request);
|
||||
responseObserver.onNext(Empty.getDefaultInstance());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void getActorStates(Empty request, StreamObserver<RpcActorRunStates> responseObserver) {
|
||||
responseObserver.onNext(actorApi.getActorStates());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void listSideloadDir(Empty request, StreamObserver<RpcUploadDirContents> responseObserver) {
|
||||
try {
|
||||
responseObserver.onNext(sideloadService.listUploadDir());
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void listFileStorage(RpcFileStorageId request, StreamObserver<RpcFileStorageContent> responseObserver) {
|
||||
try {
|
||||
responseObserver.onNext(transferService.listFiles(request));
|
||||
responseObserver.onCompleted();
|
||||
}
|
||||
catch (Exception e) {
|
||||
responseObserver.onError(e);
|
||||
}
|
||||
}
|
||||
}
|
@ -2,27 +2,19 @@ package nu.marginalia.executor;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import io.grpc.ServerBuilder;
|
||||
import nu.marginalia.actor.ExecutorActor;
|
||||
import nu.marginalia.actor.ActorApi;
|
||||
import nu.marginalia.actor.ExecutorActorControlService;
|
||||
import nu.marginalia.actor.state.ActorStateInstance;
|
||||
import nu.marginalia.executor.model.ActorRunState;
|
||||
import nu.marginalia.executor.model.ActorRunStates;
|
||||
import nu.marginalia.executor.svc.*;
|
||||
import nu.marginalia.executor.svc.TransferService;
|
||||
import nu.marginalia.service.server.BaseServiceParams;
|
||||
import nu.marginalia.service.server.Service;
|
||||
import nu.marginalia.service.server.mq.MqRequest;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import spark.Request;
|
||||
import spark.Response;
|
||||
import spark.Spark;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.Comparator;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
// Weird name for this one to not have clashes with java.util.concurrent.ExecutorService
|
||||
public class ExecutorSvc extends Service {
|
||||
@ -36,51 +28,19 @@ public class ExecutorSvc extends Service {
|
||||
@Inject
|
||||
public ExecutorSvc(BaseServiceParams params,
|
||||
ExecutorActorControlService actorControlService,
|
||||
ProcessingService processingService,
|
||||
SideloadService sideloadService,
|
||||
BackupService backupService,
|
||||
ExportService exportService,
|
||||
ExecutorGrpcService executorGrpcService,
|
||||
Gson gson,
|
||||
TransferService transferService,
|
||||
ActorApi actorApi) {
|
||||
TransferService transferService) throws IOException {
|
||||
super(params);
|
||||
this.params = params;
|
||||
this.gson = gson;
|
||||
this.actorControlService = actorControlService;
|
||||
this.transferService = transferService;
|
||||
|
||||
Spark.post("/actor/:id/start", actorApi::startActor);
|
||||
Spark.post("/actor/:id/start/:state", actorApi::startActorFromState);
|
||||
Spark.post("/actor/:id/stop", actorApi::stopActor);
|
||||
|
||||
Spark.get("/actor", this::getActorStates, gson::toJson);
|
||||
|
||||
Spark.post("/process/:id/stop", actorApi::stopProcess);
|
||||
|
||||
Spark.post("/process/crawl/:fid", processingService::startCrawl);
|
||||
Spark.post("/process/recrawl", processingService::startRecrawl);
|
||||
Spark.post("/process/convert/:fid", processingService::startConversion);
|
||||
Spark.post("/process/convert-load/:fid", processingService::startConvertLoad);
|
||||
Spark.post("/process/crawl-spec/from-download", processingService::createCrawlSpecFromDownload);
|
||||
Spark.post("/process/load", processingService::startLoad);
|
||||
Spark.post("/process/adjacency-calculation", processingService::startAdjacencyCalculation);
|
||||
|
||||
Spark.get("/sideload/", sideloadService::listUploadDir, gson::toJson);
|
||||
Spark.post("/sideload/dirtree", sideloadService::sideloadDirtree);
|
||||
Spark.post("/sideload/warc", sideloadService::sideloadWarc);
|
||||
Spark.post("/sideload/stackexchange", sideloadService::sideloadStackexchange);
|
||||
Spark.post("/sideload/encyclopedia", sideloadService::sideloadEncyclopedia);
|
||||
|
||||
Spark.post("/action/download-sample-data", sideloadService::downloadSampleData);
|
||||
|
||||
Spark.post("/export/atags", exportService::exportAtags);
|
||||
Spark.post("/export/sample-data", exportService::exportSampleData);
|
||||
Spark.post("/export/feeds", exportService::exportFeeds);
|
||||
Spark.post("/export/termfreq", exportService::exportTermFrequencies);
|
||||
Spark.post("/export/data", exportService::exportData);
|
||||
|
||||
Spark.post("/backup/:fid/restore", backupService::restore);
|
||||
Spark.get("/storage/:fid", transferService::listFiles, gson::toJson);
|
||||
var grpcServer = ServerBuilder.forPort(params.configuration.port() + 1)
|
||||
.addService(executorGrpcService)
|
||||
.build();
|
||||
grpcServer.start();
|
||||
|
||||
Spark.get("/transfer/file/:fid", transferService::transferFile);
|
||||
|
||||
@ -123,37 +83,4 @@ public class ExecutorSvc extends Service {
|
||||
return "OK";
|
||||
}
|
||||
|
||||
|
||||
private final ConcurrentHashMap<String, String> actorStateDescriptions = new ConcurrentHashMap<>();
|
||||
|
||||
private ActorRunStates getActorStates(Request request, Response response) {
|
||||
var items = actorControlService.getActorStates().entrySet().stream().map(e -> {
|
||||
final var stateGraph = actorControlService.getActorDefinition(e.getKey());
|
||||
|
||||
final ActorStateInstance state = e.getValue();
|
||||
final String actorDescription = stateGraph.describe();
|
||||
|
||||
final String machineName = e.getKey().name();
|
||||
final String stateName = state.name();
|
||||
|
||||
final String stateDescription = "";
|
||||
|
||||
final boolean terminal = state.isFinal();
|
||||
final boolean canStart = actorControlService.isDirectlyInitializable(e.getKey()) && terminal;
|
||||
|
||||
return new ActorRunState(machineName,
|
||||
stateName,
|
||||
actorDescription,
|
||||
stateDescription,
|
||||
terminal,
|
||||
canStart);
|
||||
})
|
||||
.filter(s -> !s.terminal() || s.canStart())
|
||||
.sorted(Comparator.comparing(ActorRunState::name))
|
||||
.toList();
|
||||
|
||||
return new ActorRunStates(params.configuration.node(), items);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -4,9 +4,8 @@ import com.google.inject.Inject;
|
||||
import nu.marginalia.actor.ExecutorActor;
|
||||
import nu.marginalia.actor.ExecutorActorControlService;
|
||||
import nu.marginalia.actor.task.RestoreBackupActor;
|
||||
import nu.marginalia.executor.api.RpcFileStorageId;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import spark.Request;
|
||||
import spark.Response;
|
||||
|
||||
public class BackupService {
|
||||
private final ExecutorActorControlService actorControlService;
|
||||
@ -16,9 +15,8 @@ public class BackupService {
|
||||
this.actorControlService = actorControlService;
|
||||
}
|
||||
|
||||
public Object restore(Request request, Response response) throws Exception {
|
||||
var fid = FileStorageId.parse(request.params("fid"));
|
||||
public void restore(RpcFileStorageId request) throws Exception {
|
||||
var fid = FileStorageId.of(request.getFileStorageId());
|
||||
actorControlService.startFrom(ExecutorActor.RESTORE_BACKUP, new RestoreBackupActor.Restore(fid));
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
@ -3,13 +3,10 @@ package nu.marginalia.executor.svc;
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.actor.ExecutorActor;
|
||||
import nu.marginalia.actor.ExecutorActorControlService;
|
||||
import nu.marginalia.actor.task.ConvertActor;
|
||||
import nu.marginalia.actor.task.ExportAtagsActor;
|
||||
import nu.marginalia.actor.task.ExportDataActor;
|
||||
import nu.marginalia.actor.task.ExportSampleDataActor;
|
||||
import nu.marginalia.actor.task.*;
|
||||
import nu.marginalia.executor.api.RpcExportSampleData;
|
||||
import nu.marginalia.executor.api.RpcFileStorageId;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import spark.Request;
|
||||
import spark.Response;
|
||||
|
||||
public class ExportService {
|
||||
private final ExecutorActorControlService actorControlService;
|
||||
@ -19,33 +16,36 @@ public class ExportService {
|
||||
this.actorControlService = actorControlService;
|
||||
}
|
||||
|
||||
public Object exportData(Request request, Response response) throws Exception {
|
||||
public void exportData() throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.EXPORT_DATA, new ExportDataActor.Export());
|
||||
return "";
|
||||
}
|
||||
|
||||
public Object exportSampleData(Request request, Response response) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.EXPORT_SAMPLE_DATA, new ExportSampleDataActor.Export(
|
||||
FileStorageId.parse(request.queryParams("fid")),
|
||||
Integer.parseInt(request.queryParams("size")),
|
||||
request.queryParams("name")
|
||||
));
|
||||
return "";
|
||||
public void exportSampleData(RpcExportSampleData request) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.EXPORT_SAMPLE_DATA,
|
||||
new ExportSampleDataActor.Export(
|
||||
FileStorageId.of(request.getFileStorageId()),
|
||||
request.getSize(),
|
||||
request.getName()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public Object exportAtags(Request request, Response response) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.EXPORT_ATAGS, new ExportAtagsActor.Export(FileStorageId.parse(request.queryParams("fid"))));
|
||||
return "";
|
||||
public void exportAtags(RpcFileStorageId request) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.EXPORT_ATAGS,
|
||||
new ExportAtagsActor.Export(FileStorageId.of(request.getFileStorageId()))
|
||||
);
|
||||
}
|
||||
|
||||
public Object exportFeeds(Request request, Response response) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.EXPORT_FEEDS, new ExportAtagsActor.Export(FileStorageId.parse(request.queryParams("fid"))));
|
||||
return "";
|
||||
}
|
||||
public Object exportTermFrequencies(Request request, Response response) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.EXPORT_TERM_FREQUENCIES, new ExportAtagsActor.Export(FileStorageId.parse(request.queryParams("fid"))));
|
||||
return "";
|
||||
public void exportFeeds(RpcFileStorageId request) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.EXPORT_FEEDS,
|
||||
new ExportFeedsActor.Export(FileStorageId.of(request.getFileStorageId()))
|
||||
);
|
||||
}
|
||||
|
||||
public void exportTermFrequencies(RpcFileStorageId request) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.EXPORT_TERM_FREQUENCIES,
|
||||
new ExportTermFreqActor.Export(FileStorageId.of(request.getFileStorageId()))
|
||||
);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,84 +1,62 @@
|
||||
package nu.marginalia.executor.svc;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.actor.ExecutorActor;
|
||||
import nu.marginalia.actor.ExecutorActorControlService;
|
||||
import nu.marginalia.actor.task.*;
|
||||
import nu.marginalia.executor.api.RpcCrawlSpecFromDownload;
|
||||
import nu.marginalia.executor.api.RpcFileStorageId;
|
||||
import nu.marginalia.executor.api.RpcFileStorageIds;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import nu.marginalia.executor.model.load.LoadParameters;
|
||||
import spark.Request;
|
||||
import spark.Response;
|
||||
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class ProcessingService {
|
||||
private final ExecutorActorControlService actorControlService;
|
||||
private final Gson gson;
|
||||
|
||||
@Inject
|
||||
public ProcessingService(ExecutorActorControlService actorControlService,
|
||||
Gson gson) {
|
||||
public ProcessingService(ExecutorActorControlService actorControlService) {
|
||||
this.actorControlService = actorControlService;
|
||||
this.gson = gson;
|
||||
}
|
||||
|
||||
public Object startRecrawl(Request request, Response response) throws Exception {
|
||||
var crawlId = gson.fromJson(request.body(), FileStorageId.class);
|
||||
|
||||
actorControlService.startFrom(
|
||||
ExecutorActor.RECRAWL,
|
||||
new RecrawlActor.Initial(crawlId, false)
|
||||
);
|
||||
|
||||
return "";
|
||||
public void startRecrawl(RpcFileStorageId request) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.RECRAWL,
|
||||
new RecrawlActor.Initial(FileStorageId.of(request.getFileStorageId()), false));
|
||||
}
|
||||
|
||||
public Object startCrawl(Request request, Response response) throws Exception {
|
||||
public void startCrawl(RpcFileStorageId request) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.CRAWL,
|
||||
new CrawlActor.Initial(FileStorageId.parse(request.params("fid"))));
|
||||
|
||||
return "";
|
||||
new CrawlActor.Initial(FileStorageId.of(request.getFileStorageId())));
|
||||
}
|
||||
|
||||
public Object startConversion(Request request, Response response) throws Exception {
|
||||
public void startConversion(RpcFileStorageId request) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.CONVERT,
|
||||
new ConvertActor.Convert(FileStorageId.parse(request.params("fid"))));
|
||||
|
||||
return "";
|
||||
new ConvertActor.Convert(FileStorageId.of(request.getFileStorageId())));
|
||||
}
|
||||
|
||||
public Object startConvertLoad(Request request, Response response) throws Exception {
|
||||
actorControlService.startFrom(
|
||||
ExecutorActor.CONVERT_AND_LOAD,
|
||||
new ConvertAndLoadActor.Initial(FileStorageId.parse(request.params("fid")))
|
||||
public void startConvertLoad(RpcFileStorageId request) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.CONVERT_AND_LOAD,
|
||||
new ConvertAndLoadActor.Initial(FileStorageId.of(request.getFileStorageId())));
|
||||
}
|
||||
|
||||
public void startLoad(RpcFileStorageIds request) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.CONVERT_AND_LOAD,
|
||||
new ConvertAndLoadActor.Load(request.getFileStorageIdsList()
|
||||
.stream()
|
||||
.map(FileStorageId::of)
|
||||
.collect(Collectors.toList()))
|
||||
);
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
|
||||
public Object startLoad(Request request, Response response) throws Exception {
|
||||
var params = gson.fromJson(request.body(), LoadParameters.class);
|
||||
|
||||
// Start the FSM from the intermediate state that triggers the load
|
||||
actorControlService.startFrom(
|
||||
ExecutorActor.CONVERT_AND_LOAD,
|
||||
new ConvertAndLoadActor.Load(params.ids())
|
||||
);
|
||||
|
||||
return "";
|
||||
}
|
||||
|
||||
public Object startAdjacencyCalculation(Request request, Response response) throws Exception {
|
||||
public void startAdjacencyCalculation() throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.ADJACENCY_CALCULATION, new TriggerAdjacencyCalculationActor.Run());
|
||||
return "";
|
||||
}
|
||||
|
||||
public Object createCrawlSpecFromDownload(Request request, Response response) throws Exception {
|
||||
public void createCrawlSpecFromDownload(RpcCrawlSpecFromDownload request) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.CRAWL_JOB_EXTRACTOR,
|
||||
new CrawlJobExtractorActor.CreateFromUrl(
|
||||
request.queryParamOrDefault("description", ""),
|
||||
request.queryParamOrDefault("url", ""))
|
||||
request.getDescription(),
|
||||
request.getUrl())
|
||||
);
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
@ -6,18 +6,16 @@ import nu.marginalia.actor.ExecutorActor;
|
||||
import nu.marginalia.actor.ExecutorActorControlService;
|
||||
import nu.marginalia.actor.task.ConvertActor;
|
||||
import nu.marginalia.actor.task.DownloadSampleActor;
|
||||
import nu.marginalia.executor.upload.UploadDirContents;
|
||||
import nu.marginalia.executor.upload.UploadDirItem;
|
||||
import nu.marginalia.executor.api.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import spark.Request;
|
||||
import spark.Response;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.ZoneId;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
public class SideloadService {
|
||||
private final ExecutorActorControlService actorControlService;
|
||||
@ -28,43 +26,61 @@ public class SideloadService {
|
||||
this.actorControlService = actorControlService;
|
||||
}
|
||||
|
||||
public Object sideloadDirtree(Request request, Response response) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertDirtree(request.queryParams("path")));
|
||||
return "";
|
||||
public void sideloadDirtree(RpcSideloadDirtree request) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.CONVERT,
|
||||
new ConvertActor.ConvertDirtree(request.getSourcePath())
|
||||
);
|
||||
}
|
||||
public Object sideloadWarc(Request request, Response response) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertWarc(request.queryParams("path")));
|
||||
return "";
|
||||
|
||||
public void sideloadWarc(RpcSideloadWarc request) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.CONVERT,
|
||||
new ConvertActor.ConvertWarc(request.getSourcePath())
|
||||
);
|
||||
}
|
||||
public Object sideloadEncyclopedia(Request request, Response response) throws Exception {
|
||||
|
||||
public void sideloadEncyclopedia(RpcSideloadEncyclopedia request) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.CONVERT,
|
||||
new ConvertActor.ConvertEncyclopedia(
|
||||
request.queryParams("path"),
|
||||
request.queryParams("baseUrl")
|
||||
request.getSourcePath(),
|
||||
request.getBaseUrl()
|
||||
));
|
||||
return "";
|
||||
}
|
||||
|
||||
public Object sideloadStackexchange(Request request, Response response) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertStackexchange(request.queryParams("path")));
|
||||
return "";
|
||||
public void sideloadStackexchange(RpcSideloadStackexchange request) throws Exception {
|
||||
actorControlService.startFrom(ExecutorActor.CONVERT,
|
||||
new ConvertActor.ConvertStackexchange(request.getSourcePath())
|
||||
);
|
||||
}
|
||||
|
||||
public UploadDirContents listUploadDir(Request request, Response response) throws IOException {
|
||||
public RpcUploadDirContents listUploadDir() throws IOException {
|
||||
Path uploadDir = WmsaHome.getUploadDir();
|
||||
|
||||
try (var items = Files.list(uploadDir)) {
|
||||
return new UploadDirContents(uploadDir.toString(),
|
||||
items.map(UploadDirItem::fromPath).toList());
|
||||
}
|
||||
var builder = RpcUploadDirContents.newBuilder().setPath(uploadDir.toString());
|
||||
|
||||
var iter = items.iterator();
|
||||
while (iter.hasNext()) {
|
||||
var path = iter.next();
|
||||
|
||||
boolean isDir = Files.isDirectory(path);
|
||||
long size = isDir ? 0 : Files.size(path);
|
||||
var mtime = Files.getLastModifiedTime(path);
|
||||
|
||||
builder.addEntriesBuilder()
|
||||
.setName(path.toString())
|
||||
.setIsDirectory(isDir)
|
||||
.setLastModifiedTime(
|
||||
LocalDateTime.ofInstant(mtime.toInstant(), ZoneId.systemDefault()).format(DateTimeFormatter.ISO_DATE_TIME))
|
||||
.setSize(size);
|
||||
}
|
||||
|
||||
return builder.build();
|
||||
}
|
||||
}
|
||||
|
||||
public Object downloadSampleData(Request request, Response response) throws Exception {
|
||||
String sampleSet = request.queryParams("set");
|
||||
public void downloadSampleData(RpcDownloadSampleData request) throws Exception {
|
||||
String sampleSet = request.getSampleSet();
|
||||
|
||||
actorControlService.startFrom(ExecutorActor.DOWNLOAD_SAMPLE, new DownloadSampleActor.Run(sampleSet));
|
||||
|
||||
return "";
|
||||
}
|
||||
}
|
||||
|
@ -5,11 +5,12 @@ import com.google.inject.Inject;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.client.Context;
|
||||
import nu.marginalia.executor.api.RpcFileStorageContent;
|
||||
import nu.marginalia.executor.api.RpcFileStorageEntry;
|
||||
import nu.marginalia.executor.api.RpcFileStorageId;
|
||||
import nu.marginalia.executor.client.ExecutorClient;
|
||||
import nu.marginalia.executor.model.transfer.TransferItem;
|
||||
import nu.marginalia.executor.model.transfer.TransferSpec;
|
||||
import nu.marginalia.executor.storage.FileStorageContent;
|
||||
import nu.marginalia.executor.storage.FileStorageFile;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mq.persistence.MqPersistence;
|
||||
import nu.marginalia.process.log.WorkLog;
|
||||
@ -74,30 +75,31 @@ public class TransferService {
|
||||
}
|
||||
|
||||
|
||||
public FileStorageContent listFiles(Request request, Response response) throws SQLException, IOException {
|
||||
FileStorageId fileStorageId = FileStorageId.parse(request.params("fid"));
|
||||
public RpcFileStorageContent listFiles(RpcFileStorageId request) throws SQLException, IOException {
|
||||
FileStorageId fileStorageId = FileStorageId.of(request.getFileStorageId());
|
||||
|
||||
var storage = fileStorageService.getStorage(fileStorageId);
|
||||
|
||||
List<FileStorageFile> files;
|
||||
var builder = RpcFileStorageContent.newBuilder();
|
||||
|
||||
|
||||
try (var fs = Files.list(storage.asPath())) {
|
||||
files = fs.filter(Files::isRegularFile)
|
||||
fs.filter(Files::isRegularFile)
|
||||
.map(this::createFileModel)
|
||||
.sorted(Comparator.comparing(FileStorageFile::name))
|
||||
.toList();
|
||||
.sorted(Comparator.comparing(RpcFileStorageEntry::getName))
|
||||
.forEach(builder::addEntries);
|
||||
}
|
||||
|
||||
return new FileStorageContent(files);
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
@SneakyThrows
|
||||
private FileStorageFile createFileModel(Path path) {
|
||||
return new FileStorageFile(
|
||||
path.toFile().getName(),
|
||||
Files.size(path),
|
||||
Files.getLastModifiedTime(path).toInstant().toString()
|
||||
);
|
||||
private RpcFileStorageEntry createFileModel(Path path) {
|
||||
return RpcFileStorageEntry.newBuilder()
|
||||
.setName(path.toFile().getName())
|
||||
.setSize(Files.size(path))
|
||||
.setLastModifiedTime(Files.getLastModifiedTime(path).toInstant().toString())
|
||||
.build();
|
||||
}
|
||||
|
||||
public TransferSpec getTransferSpec(Request request, Response response) throws SQLException {
|
||||
|
@ -1,155 +0,0 @@
|
||||
package nu.marginalia.executor;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.Guice;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Provides;
|
||||
import nu.marginalia.actor.ExecutorActor;
|
||||
import nu.marginalia.actor.ExecutorActorControlService;
|
||||
import nu.marginalia.actor.task.TriggerAdjacencyCalculationActor;
|
||||
import nu.marginalia.client.Context;
|
||||
import nu.marginalia.client.route.RouteProvider;
|
||||
import nu.marginalia.process.ProcessOutboxes;
|
||||
import nu.marginalia.process.ProcessService;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
import nu.marginalia.executor.client.ExecutorClient;
|
||||
import nu.marginalia.index.client.IndexClient;
|
||||
import nu.marginalia.mq.MessageQueueFactory;
|
||||
import nu.marginalia.mq.inbox.MqAsynchronousInbox;
|
||||
import nu.marginalia.service.control.ServiceEventLog;
|
||||
import nu.marginalia.service.control.ServiceHeartbeatImpl;
|
||||
import nu.marginalia.service.descriptor.ServiceDescriptor;
|
||||
import nu.marginalia.service.descriptor.ServiceDescriptors;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import nu.marginalia.service.server.Initialization;
|
||||
import nu.marginalia.service.server.MetricsServer;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.mockito.Mockito;
|
||||
import spark.Spark;
|
||||
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
|
||||
@Tag("slow")
|
||||
public class ExecutorSvcApiIntegrationTest {
|
||||
|
||||
static TestInstances testInstances;
|
||||
static final int port = 9999;
|
||||
|
||||
@BeforeAll
|
||||
public static void setUpAll() {
|
||||
RouteProvider.setDefaultPort(port);
|
||||
var injector = Guice.createInjector(new TestModule());
|
||||
testInstances = injector.getInstance(TestInstances.class);
|
||||
injector.getInstance(Initialization.class).setReady();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void tearDownAll() {
|
||||
RouteProvider.resetDefaultPort();
|
||||
}
|
||||
|
||||
@BeforeEach
|
||||
public void setUp() {
|
||||
Mockito.reset(testInstances.actorControlService);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void tearDown() {
|
||||
Spark.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void startStartActor() throws Exception {
|
||||
testInstances.client.startFsm(Context.internal(), 0, "crawl");
|
||||
Mockito.verify(testInstances.actorControlService).start(ExecutorActor.CRAWL);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void startStopActor() {
|
||||
testInstances.client.stopFsm(Context.internal(), 0, "crawl");
|
||||
|
||||
Mockito.verify(testInstances.actorControlService).stop(ExecutorActor.CRAWL);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void triggerCrawl() throws Exception {
|
||||
testInstances.client.triggerCrawl(Context.internal(), 0, FileStorageId.of(1));
|
||||
|
||||
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CRAWL), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void triggerRecrawl() throws Exception {
|
||||
testInstances.client.triggerRecrawl(Context.internal(), 0,
|
||||
new FileStorageId(0));
|
||||
|
||||
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.RECRAWL), any());
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void triggerProcessAndLoad() throws Exception {
|
||||
testInstances.client.triggerConvertAndLoad(Context.internal(), 0, FileStorageId.of(1));
|
||||
|
||||
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.CONVERT_AND_LOAD), any());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void calculateAdjacencies() throws Exception {
|
||||
testInstances.client.calculateAdjacencies(Context.internal(), 0);
|
||||
|
||||
Mockito.verify(testInstances.actorControlService).startFrom(eq(ExecutorActor.ADJACENCY_CALCULATION), eq(new TriggerAdjacencyCalculationActor.Run()));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
class TestInstances {
|
||||
@Inject
|
||||
ExecutorSvc service;
|
||||
@Inject
|
||||
ExecutorClient client;
|
||||
@Inject
|
||||
ExecutorActorControlService actorControlService;
|
||||
}
|
||||
class TestModule extends AbstractModule {
|
||||
|
||||
@Override
|
||||
public void configure() {
|
||||
System.setProperty("service-name", "test");
|
||||
bind(ExecutorActorControlService.class).toInstance(Mockito.mock(ExecutorActorControlService.class));
|
||||
bind(FileStorageService.class).toInstance(Mockito.mock(FileStorageService.class));
|
||||
bind(ProcessService.class).toInstance(Mockito.mock(ProcessService.class));
|
||||
bind(ServiceEventLog.class).toInstance(Mockito.mock(ServiceEventLog.class));
|
||||
bind(ServiceHeartbeatImpl.class).toInstance(Mockito.mock(ServiceHeartbeatImpl.class));
|
||||
bind(MetricsServer.class).toInstance(Mockito.mock(MetricsServer.class));
|
||||
bind(IndexClient.class).toInstance(Mockito.mock(IndexClient.class));
|
||||
bind(ProcessOutboxes.class).toInstance(Mockito.mock(ProcessOutboxes.class));
|
||||
bind(ServiceConfiguration.class)
|
||||
.toInstance(new ServiceConfiguration(ServiceId.Executor,
|
||||
0, "127.0.0.1", ExecutorSvcApiIntegrationTest.port, -1, UUID.randomUUID()));
|
||||
}
|
||||
|
||||
@Provides
|
||||
public ServiceDescriptors getServiceDescriptors() {
|
||||
return new ServiceDescriptors(
|
||||
List.of(new ServiceDescriptor(ServiceId.Executor, "127.0.0.1"))
|
||||
);
|
||||
}
|
||||
|
||||
@Provides
|
||||
public MessageQueueFactory getMessageQueueFactory() {
|
||||
var mock = Mockito.mock(MessageQueueFactory.class);
|
||||
|
||||
Mockito.when(mock.createAsynchronousInbox(Mockito.anyString(), Mockito.anyInt(), any())).
|
||||
thenReturn(Mockito.mock(MqAsynchronousInbox.class));
|
||||
|
||||
return mock;
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package nu.marginalia.query.svc;
|
||||
package nu.marginalia.query;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import lombok.SneakyThrows;
|
||||
@ -6,7 +6,6 @@ import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
@ -4,11 +4,9 @@ import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.client.Context;
|
||||
import nu.marginalia.index.client.IndexClient;
|
||||
import nu.marginalia.index.client.model.query.SearchSetIdentifier;
|
||||
import nu.marginalia.index.query.limit.QueryLimits;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
import nu.marginalia.query.model.QueryParams;
|
||||
import nu.marginalia.query.svc.NodeConfigurationWatcher;
|
||||
import nu.marginalia.renderer.MustacheRenderer;
|
||||
import nu.marginalia.renderer.RendererFactory;
|
||||
import nu.marginalia.query.svc.QueryFactory;
|
||||
|
@ -3,28 +3,34 @@ package nu.marginalia.query;
|
||||
import com.google.inject.Inject;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import nu.marginalia.client.grpc.GrpcStubPool;
|
||||
import nu.marginalia.index.api.IndexDomainLinksApiGrpc;
|
||||
import nu.marginalia.index.api.RpcDomainIdCount;
|
||||
import nu.marginalia.index.api.RpcDomainIdList;
|
||||
import nu.marginalia.index.api.RpcDomainIdPairs;
|
||||
import nu.marginalia.query.svc.NodeConfigurationWatcher;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
|
||||
public class QueryGRPCDomainLinksService extends IndexDomainLinksApiGrpc.IndexDomainLinksApiImplBase {
|
||||
private static final Logger logger = LoggerFactory.getLogger(QueryGRPCDomainLinksService.class);
|
||||
private final NodeConfigurationWatcher nodeConfigurationWatcher;
|
||||
private final QueryGrpcStubPool<IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub> stubPool;
|
||||
private final GrpcStubPool<IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub> stubPool;
|
||||
|
||||
@Inject
|
||||
public QueryGRPCDomainLinksService(NodeConfigurationWatcher nodeConfigurationWatcher) {
|
||||
this.nodeConfigurationWatcher = nodeConfigurationWatcher;
|
||||
stubPool = new QueryGrpcStubPool<>(nodeConfigurationWatcher) {
|
||||
stubPool = new GrpcStubPool<>(ServiceId.Index) {
|
||||
@Override
|
||||
public IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub createStub(ManagedChannel channel) {
|
||||
return IndexDomainLinksApiGrpc.newBlockingStub(channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Integer> getEligibleNodes() {
|
||||
return nodeConfigurationWatcher.getQueryNodes();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -4,11 +4,12 @@ import com.google.inject.Inject;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.prometheus.client.Histogram;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.client.grpc.GrpcStubPool;
|
||||
import nu.marginalia.db.DomainBlacklist;
|
||||
import nu.marginalia.index.api.*;
|
||||
import nu.marginalia.model.id.UrlIdCodec;
|
||||
import nu.marginalia.query.svc.NodeConfigurationWatcher;
|
||||
import nu.marginalia.query.svc.QueryFactory;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -26,23 +27,29 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
|
||||
.help("QS-side query time (GRPC endpoint)")
|
||||
.register();
|
||||
|
||||
private final QueryGrpcStubPool<IndexApiGrpc.IndexApiBlockingStub> stubPool;
|
||||
private final GrpcStubPool<IndexApiGrpc.IndexApiBlockingStub> stubPool;
|
||||
|
||||
private final QueryFactory queryFactory;
|
||||
private final DomainBlacklist blacklist;
|
||||
private final NodeConfigurationWatcher nodeConfigurationWatcher;
|
||||
|
||||
@Inject
|
||||
public QueryGRPCService(QueryFactory queryFactory, DomainBlacklist blacklist, NodeConfigurationWatcher nodeConfigurationWatcher) {
|
||||
public QueryGRPCService(QueryFactory queryFactory,
|
||||
DomainBlacklist blacklist,
|
||||
NodeConfigurationWatcher nodeConfigurationWatcher)
|
||||
{
|
||||
this.queryFactory = queryFactory;
|
||||
this.blacklist = blacklist;
|
||||
this.nodeConfigurationWatcher = nodeConfigurationWatcher;
|
||||
|
||||
stubPool = new QueryGrpcStubPool<>(nodeConfigurationWatcher) {
|
||||
stubPool = new GrpcStubPool<>(ServiceId.Index) {
|
||||
@Override
|
||||
public IndexApiGrpc.IndexApiBlockingStub createStub(ManagedChannel channel) {
|
||||
return IndexApiGrpc.newBlockingStub(channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Integer> getEligibleNodes() {
|
||||
return nodeConfigurationWatcher.getQueryNodes();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -12,7 +12,6 @@ import nu.marginalia.index.client.model.results.DecoratedSearchResultItem;
|
||||
import nu.marginalia.index.client.model.results.SearchResultSet;
|
||||
import nu.marginalia.query.model.QueryParams;
|
||||
import nu.marginalia.query.model.QueryResponse;
|
||||
import nu.marginalia.query.svc.NodeConfigurationWatcher;
|
||||
import nu.marginalia.query.svc.QueryFactory;
|
||||
import nu.marginalia.service.server.BaseServiceParams;
|
||||
import nu.marginalia.service.server.Service;
|
||||
|
29
protobuf.gradle
Normal file
29
protobuf.gradle
Normal file
@ -0,0 +1,29 @@
|
||||
// Boilerplate configuration that should be included whenever protobufs are used
|
||||
// see e.g. index-api's build.gradle
|
||||
|
||||
protobuf {
|
||||
protoc {
|
||||
if (osdetector.os == "osx") {
|
||||
artifact = "com.google.protobuf:protoc:3.0.2:osx-x86_64"
|
||||
} else {
|
||||
artifact = "com.google.protobuf:protoc:3.0.2"
|
||||
}
|
||||
}
|
||||
plugins {
|
||||
grpc {
|
||||
if (osdetector.os == "osx") {
|
||||
artifact = "io.grpc:protoc-gen-grpc-java:1.1.2:osx-x86_64"
|
||||
} else {
|
||||
artifact = "io.grpc:protoc-gen-grpc-java:1.1.2"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
generateProtoTasks {
|
||||
all().each { task ->
|
||||
task.plugins {
|
||||
grpc {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user