Merge pull request #81 from MarginaliaSearch/service-discovery
Zookeeper for service-discovery, kill service-client lib, refactor everything
This commit is contained in:
commit
0d6e7673e4
43
build.gradle
43
build.gradle
@ -3,6 +3,10 @@ plugins {
|
||||
id("org.jetbrains.gradle.plugin.idea-ext") version "1.0"
|
||||
id "io.freefair.lombok" version "8.3"
|
||||
id "me.champeau.jmh" version "0.6.6"
|
||||
|
||||
// This is a workaround for a bug in the Jib plugin that causes it to stall randomly
|
||||
// https://github.com/GoogleContainerTools/jib/issues/3347
|
||||
id 'com.google.cloud.tools.jib' version '3.4.0' apply(false)
|
||||
}
|
||||
|
||||
group 'marginalia'
|
||||
@ -13,6 +17,14 @@ compileTestJava.options.encoding = "UTF-8"
|
||||
|
||||
subprojects.forEach {it ->
|
||||
// Enable preview features for the entire project
|
||||
|
||||
if (it.path.contains(':code:')) {
|
||||
sourceSets.main.java.srcDirs += file('java')
|
||||
sourceSets.main.resources.srcDirs += file('resources')
|
||||
sourceSets.test.java.srcDirs += file('test')
|
||||
sourceSets.test.resources.srcDirs += file('test-resources')
|
||||
}
|
||||
|
||||
it.tasks.withType(JavaCompile).configureEach {
|
||||
options.compilerArgs += ['--enable-preview']
|
||||
}
|
||||
@ -28,32 +40,12 @@ subprojects.forEach {it ->
|
||||
preserveFileTimestamps = false
|
||||
reproducibleFileOrder = true
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
allprojects {
|
||||
apply plugin: 'java'
|
||||
apply plugin: 'io.freefair.lombok'
|
||||
|
||||
dependencies {
|
||||
implementation libs.lombok
|
||||
testImplementation libs.lombok
|
||||
annotationProcessor libs.lombok
|
||||
|
||||
lombok libs.lombok // prevent plugin from downgrading the version to something incompatible with '19
|
||||
}
|
||||
|
||||
test {
|
||||
maxHeapSize = "8G"
|
||||
useJUnitPlatform()
|
||||
}
|
||||
|
||||
tasks.register('fastTests', Test) {
|
||||
maxHeapSize = "8G"
|
||||
useJUnitPlatform {
|
||||
excludeTags "slow"
|
||||
}
|
||||
}
|
||||
|
||||
ext {
|
||||
dockerImageBase='container-registry.oracle.com/graalvm/jdk:21@sha256:1fd33d4d4eba3a9e1a41a728e39ea217178d257694eea1214fec68d2ed4d3d9b'
|
||||
dockerImageTag='latest'
|
||||
dockerImageRegistry='marginalia'
|
||||
}
|
||||
|
||||
idea {
|
||||
@ -77,3 +69,4 @@ java {
|
||||
languageVersion.set(JavaLanguageVersion.of(21))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,8 +0,0 @@
|
||||
# Assistant API
|
||||
|
||||
Client and models for talking to the [assistant-service](../../services-core/assistant-service),
|
||||
implemented with the base client from [service-client](../../common/service-client).
|
||||
|
||||
## Central Classes
|
||||
|
||||
* [AssistantClient](src/main/java/nu/marginalia/assistant/client/AssistantClient.java)
|
@ -1,95 +0,0 @@
|
||||
package nu.marginalia.assistant.client;
|
||||
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import io.reactivex.rxjava3.core.Observable;
|
||||
import nu.marginalia.assistant.client.model.DictionaryResponse;
|
||||
import nu.marginalia.assistant.client.model.DomainInformation;
|
||||
import nu.marginalia.assistant.client.model.SimilarDomain;
|
||||
import nu.marginalia.client.AbstractDynamicClient;
|
||||
import nu.marginalia.client.exception.RouteNotConfiguredException;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
import nu.marginalia.service.descriptor.ServiceDescriptors;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.client.Context;
|
||||
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
@Singleton
|
||||
public class AssistantClient extends AbstractDynamicClient {
|
||||
|
||||
@Inject
|
||||
public AssistantClient(ServiceDescriptors descriptors) {
|
||||
super(descriptors.forId(ServiceId.Assistant), GsonFactory::get);
|
||||
}
|
||||
|
||||
public Observable<DictionaryResponse> dictionaryLookup(Context ctx, String word) {
|
||||
try {
|
||||
return super.get(ctx, 0, "/dictionary/" + URLEncoder.encode(word, StandardCharsets.UTF_8), DictionaryResponse.class);
|
||||
}
|
||||
catch (RouteNotConfiguredException ex) {
|
||||
return Observable.empty();
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public Observable<List<String>> spellCheck(Context ctx, String word) {
|
||||
try {
|
||||
return (Observable<List<String>>) (Object) super.get(ctx, 0, "/spell-check/" + URLEncoder.encode(word, StandardCharsets.UTF_8), List.class);
|
||||
}
|
||||
catch (RouteNotConfiguredException ex) {
|
||||
return Observable.empty();
|
||||
}
|
||||
}
|
||||
public Observable<String> unitConversion(Context ctx, String value, String from, String to) {
|
||||
try {
|
||||
return super.get(ctx, 0, "/unit-conversion?value=" + value + "&from=" + from + "&to=" + to);
|
||||
}
|
||||
catch (RouteNotConfiguredException ex) {
|
||||
return Observable.empty();
|
||||
}
|
||||
}
|
||||
|
||||
public Observable<String> evalMath(Context ctx, String expression) {
|
||||
try {
|
||||
return super.get(ctx, 0, "/eval-expression?value=" + URLEncoder.encode(expression, StandardCharsets.UTF_8));
|
||||
}
|
||||
catch (RouteNotConfiguredException ex) {
|
||||
return Observable.empty();
|
||||
}
|
||||
}
|
||||
|
||||
public Observable<ArrayList<SimilarDomain>> similarDomains(Context ctx, int domainId, int count) {
|
||||
try {
|
||||
return super.get(ctx, 0, STR."/domain/\{domainId}/similar?count=\{count}", new TypeToken<ArrayList<SimilarDomain>>() {})
|
||||
.onErrorResumeWith(Observable.just(new ArrayList<>()));
|
||||
}
|
||||
catch (RouteNotConfiguredException ex) {
|
||||
return Observable.empty();
|
||||
}
|
||||
}
|
||||
|
||||
public Observable<ArrayList<SimilarDomain>> linkedDomains(Context ctx, int domainId, int count) {
|
||||
try {
|
||||
return super.get(ctx, 0, STR."/domain/\{domainId}/linking?count=\{count}", new TypeToken<ArrayList<SimilarDomain>>() {})
|
||||
.onErrorResumeWith(Observable.just(new ArrayList<>()));
|
||||
}
|
||||
catch (RouteNotConfiguredException ex) {
|
||||
return Observable.empty();
|
||||
}
|
||||
}
|
||||
|
||||
public Observable<DomainInformation> domainInformation(Context ctx, int domainId) {
|
||||
try {
|
||||
return super.get(ctx, 0, STR."/domain/\{domainId}/info", DomainInformation.class)
|
||||
.onErrorResumeWith(Observable.just(new DomainInformation()));
|
||||
}
|
||||
catch (RouteNotConfiguredException ex) {
|
||||
return Observable.empty();
|
||||
}
|
||||
}
|
||||
}
|
@ -1,307 +0,0 @@
|
||||
package nu.marginalia.executor.client;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import nu.marginalia.client.AbstractDynamicClient;
|
||||
import nu.marginalia.client.Context;
|
||||
import nu.marginalia.client.grpc.GrpcChannelPool;
|
||||
import nu.marginalia.executor.api.*;
|
||||
import nu.marginalia.executor.api.ExecutorApiGrpc.ExecutorApiBlockingStub;
|
||||
import nu.marginalia.executor.model.ActorRunState;
|
||||
import nu.marginalia.executor.model.ActorRunStates;
|
||||
import nu.marginalia.executor.model.transfer.TransferItem;
|
||||
import nu.marginalia.executor.model.transfer.TransferSpec;
|
||||
import nu.marginalia.executor.storage.FileStorageContent;
|
||||
import nu.marginalia.executor.storage.FileStorageFile;
|
||||
import nu.marginalia.executor.upload.UploadDirContents;
|
||||
import nu.marginalia.executor.upload.UploadDirItem;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import nu.marginalia.nodecfg.model.NodeConfiguration;
|
||||
import nu.marginalia.service.descriptor.ServiceDescriptors;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
|
||||
import io.grpc.ManagedChannel;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.OutputStream;
|
||||
import java.net.URLEncoder;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Singleton
|
||||
public class ExecutorClient extends AbstractDynamicClient {
|
||||
private final GrpcChannelPool<ExecutorApiBlockingStub> channelPool;
|
||||
private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class);
|
||||
|
||||
@Inject
|
||||
public ExecutorClient(ServiceDescriptors descriptors, NodeConfigurationService nodeConfigurationService) {
|
||||
super(descriptors.forId(ServiceId.Executor), GsonFactory::get);
|
||||
|
||||
channelPool = new GrpcChannelPool<>(ServiceId.Executor) {
|
||||
@Override
|
||||
public ExecutorApiBlockingStub createStub(ManagedChannel channel) {
|
||||
return ExecutorApiGrpc.newBlockingStub(channel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Integer> getEligibleNodes() {
|
||||
return nodeConfigurationService.getAll()
|
||||
.stream()
|
||||
.map(NodeConfiguration::node)
|
||||
.toList();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public void startFsm(int node, String actorName) {
|
||||
channelPool.apiForNode(node).startFsm(
|
||||
RpcFsmName.newBuilder()
|
||||
.setActorName(actorName)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void stopFsm(int node, String actorName) {
|
||||
channelPool.apiForNode(node).stopFsm(
|
||||
RpcFsmName.newBuilder()
|
||||
.setActorName(actorName)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void stopProcess(int node, String id) {
|
||||
channelPool.apiForNode(node).stopProcess(
|
||||
RpcProcessId.newBuilder()
|
||||
.setProcessId(id)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void triggerCrawl(int node, FileStorageId fid) {
|
||||
channelPool.apiForNode(node).triggerCrawl(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void triggerRecrawl(int node, FileStorageId fid) {
|
||||
channelPool.apiForNode(node).triggerRecrawl(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void triggerConvert(int node, FileStorageId fid) {
|
||||
channelPool.apiForNode(node).triggerConvert(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void triggerConvertAndLoad(int node, FileStorageId fid) {
|
||||
channelPool.apiForNode(node).triggerConvertAndLoad(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void loadProcessedData(int node, List<FileStorageId> ids) {
|
||||
channelPool.apiForNode(node).loadProcessedData(
|
||||
RpcFileStorageIds.newBuilder()
|
||||
.addAllFileStorageIds(ids.stream().map(FileStorageId::id).toList())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void calculateAdjacencies(int node) {
|
||||
channelPool.apiForNode(node).calculateAdjacencies(Empty.getDefaultInstance());
|
||||
}
|
||||
|
||||
public void sideloadEncyclopedia(int node, Path sourcePath, String baseUrl) {
|
||||
channelPool.apiForNode(node).sideloadEncyclopedia(
|
||||
RpcSideloadEncyclopedia.newBuilder()
|
||||
.setBaseUrl(baseUrl)
|
||||
.setSourcePath(sourcePath.toString())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void sideloadDirtree(int node, Path sourcePath) {
|
||||
channelPool.apiForNode(node).sideloadDirtree(
|
||||
RpcSideloadDirtree.newBuilder()
|
||||
.setSourcePath(sourcePath.toString())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
public void sideloadReddit(int node, Path sourcePath) {
|
||||
channelPool.apiForNode(node).sideloadReddit(
|
||||
RpcSideloadReddit.newBuilder()
|
||||
.setSourcePath(sourcePath.toString())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
public void sideloadWarc(int node, Path sourcePath) {
|
||||
channelPool.apiForNode(node).sideloadWarc(
|
||||
RpcSideloadWarc.newBuilder()
|
||||
.setSourcePath(sourcePath.toString())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void sideloadStackexchange(int node, Path sourcePath) {
|
||||
channelPool.apiForNode(node).sideloadStackexchange(
|
||||
RpcSideloadStackexchange.newBuilder()
|
||||
.setSourcePath(sourcePath.toString())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void createCrawlSpecFromDownload(int node, String description, String url) {
|
||||
channelPool.apiForNode(node).createCrawlSpecFromDownload(
|
||||
RpcCrawlSpecFromDownload.newBuilder()
|
||||
.setDescription(description)
|
||||
.setUrl(url)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void exportAtags(int node, FileStorageId fid) {
|
||||
channelPool.apiForNode(node).exportAtags(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
public void exportSampleData(int node, FileStorageId fid, int size, String name) {
|
||||
channelPool.apiForNode(node).exportSampleData(
|
||||
RpcExportSampleData.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.setSize(size)
|
||||
.setName(name)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void exportRssFeeds(int node, FileStorageId fid) {
|
||||
channelPool.apiForNode(node).exportRssFeeds(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
public void exportTermFrequencies(int node, FileStorageId fid) {
|
||||
channelPool.apiForNode(node).exportTermFrequencies(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void downloadSampleData(int node, String sampleSet) {
|
||||
channelPool.apiForNode(node).downloadSampleData(
|
||||
RpcDownloadSampleData.newBuilder()
|
||||
.setSampleSet(sampleSet)
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public void exportData(int node) {
|
||||
channelPool.apiForNode(node).exportData(Empty.getDefaultInstance());
|
||||
}
|
||||
|
||||
public void restoreBackup(int node, FileStorageId fid) {
|
||||
channelPool.apiForNode(node).restoreBackup(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fid.id())
|
||||
.build()
|
||||
);
|
||||
}
|
||||
|
||||
public ActorRunStates getActorStates(int node) {
|
||||
try {
|
||||
var rs = channelPool.apiForNode(node).getActorStates(Empty.getDefaultInstance());
|
||||
var states = rs.getActorRunStatesList().stream()
|
||||
.map(r -> new ActorRunState(
|
||||
r.getActorName(),
|
||||
r.getState(),
|
||||
r.getActorDescription(),
|
||||
r.getStateDescription(),
|
||||
r.getTerminal(),
|
||||
r.getCanStart())
|
||||
)
|
||||
.toList();
|
||||
|
||||
return new ActorRunStates(node, states);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.warn("Failed to get actor states", ex);
|
||||
|
||||
// Return an empty list of states to avoid breaking the UI when a node is down
|
||||
return new ActorRunStates(node, List.of());
|
||||
}
|
||||
}
|
||||
|
||||
public UploadDirContents listSideloadDir(int node) {
|
||||
try {
|
||||
var rs = channelPool.apiForNode(node).listSideloadDir(Empty.getDefaultInstance());
|
||||
var items = rs.getEntriesList().stream()
|
||||
.map(i -> new UploadDirItem(i.getName(), i.getLastModifiedTime(), i.getIsDirectory(), i.getSize()))
|
||||
.toList();
|
||||
return new UploadDirContents(rs.getPath(), items);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.warn("Failed to list sideload dir", ex);
|
||||
|
||||
// Return an empty list of items to avoid breaking the UI when a node is down
|
||||
return new UploadDirContents("", List.of());
|
||||
}
|
||||
}
|
||||
|
||||
public FileStorageContent listFileStorage(int node, FileStorageId fileId) {
|
||||
try {
|
||||
var rs = channelPool.apiForNode(node).listFileStorage(
|
||||
RpcFileStorageId.newBuilder()
|
||||
.setFileStorageId(fileId.id())
|
||||
.build()
|
||||
);
|
||||
|
||||
return new FileStorageContent(rs.getEntriesList().stream()
|
||||
.map(e -> new FileStorageFile(e.getName(), e.getSize(), e.getLastModifiedTime()))
|
||||
.toList());
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.warn("Failed to list file storage", ex);
|
||||
|
||||
// Return an empty list of items to avoid breaking the UI when a node is down
|
||||
return new FileStorageContent(List.of());
|
||||
}
|
||||
}
|
||||
|
||||
public void transferFile(Context context, int node, FileStorageId fileId, String path, OutputStream destOutputStream) {
|
||||
String endpoint = "/transfer/file/%d?path=%s".formatted(fileId.id(), URLEncoder.encode(path, StandardCharsets.UTF_8));
|
||||
|
||||
get(context, node, endpoint,
|
||||
destOutputStream)
|
||||
.blockingSubscribe();
|
||||
}
|
||||
|
||||
public TransferSpec getTransferSpec(Context context, int node, int count) {
|
||||
return get(context, node, "/transfer/spec?count="+count, TransferSpec.class)
|
||||
.timeout(30, TimeUnit.MINUTES)
|
||||
.blockingFirst();
|
||||
}
|
||||
|
||||
public void yieldDomain(Context context, int node, TransferItem item) {
|
||||
post(context, node, "/transfer/yield", item).blockingSubscribe();
|
||||
}
|
||||
|
||||
}
|
@ -1,9 +0,0 @@
|
||||
package nu.marginalia.executor.model.transfer;
|
||||
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
|
||||
public record TransferItem(String domainName,
|
||||
int domainId,
|
||||
FileStorageId fileStorageId,
|
||||
String path) {
|
||||
}
|
@ -1,13 +0,0 @@
|
||||
package nu.marginalia.executor.model.transfer;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public record TransferSpec(List<TransferItem> items) {
|
||||
public TransferSpec() {
|
||||
this(List.of());
|
||||
}
|
||||
|
||||
public int size() {
|
||||
return items.size();
|
||||
}
|
||||
}
|
@ -1,8 +0,0 @@
|
||||
# Index API
|
||||
|
||||
Client and models for talking to the [index-service](../../services-core/index-service),
|
||||
implemented with the base client from [service-client](../../common/service-client).
|
||||
|
||||
## Central Classes
|
||||
|
||||
* [IndexClient](src/main/java/nu/marginalia/index/client/IndexClient.java)
|
@ -1,95 +0,0 @@
|
||||
package nu.marginalia.index.client;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import com.google.inject.name.Named;
|
||||
import io.prometheus.client.Summary;
|
||||
import io.reactivex.rxjava3.core.Observable;
|
||||
import io.reactivex.rxjava3.schedulers.Schedulers;
|
||||
import nu.marginalia.client.AbstractDynamicClient;
|
||||
import nu.marginalia.client.Context;
|
||||
import nu.marginalia.client.exception.RouteNotConfiguredException;
|
||||
import nu.marginalia.index.client.model.query.SearchSpecification;
|
||||
import nu.marginalia.index.client.model.results.SearchResultSet;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
import nu.marginalia.mq.MessageQueueFactory;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.service.descriptor.ServiceDescriptors;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
import javax.annotation.CheckReturnValue;
|
||||
import java.util.UUID;
|
||||
|
||||
@Singleton
|
||||
public class IndexClient extends AbstractDynamicClient {
|
||||
|
||||
private static final Summary wmsa_search_index_api_time = Summary.build().name("wmsa_search_index_api_time").help("-").register();
|
||||
private final MessageQueueFactory messageQueueFactory;
|
||||
|
||||
MqOutbox outbox;
|
||||
|
||||
@Inject
|
||||
public IndexClient(ServiceDescriptors descriptors,
|
||||
MessageQueueFactory messageQueueFactory,
|
||||
@Named("wmsa-system-node") Integer nodeId)
|
||||
{
|
||||
super(descriptors.forId(ServiceId.Index), GsonFactory::get);
|
||||
this.messageQueueFactory = messageQueueFactory;
|
||||
|
||||
String inboxName = ServiceId.Index.serviceName;
|
||||
String outboxName = "pp:"+System.getProperty("service-name", UUID.randomUUID().toString());
|
||||
outbox = messageQueueFactory.createOutbox(inboxName, nodeId, outboxName, nodeId, UUID.randomUUID());
|
||||
setTimeout(30);
|
||||
}
|
||||
|
||||
public MqOutbox outbox() {
|
||||
return outbox;
|
||||
}
|
||||
|
||||
@CheckReturnValue
|
||||
public SearchResultSet query(Context ctx, int node, SearchSpecification specs) {
|
||||
return wmsa_search_index_api_time.time(
|
||||
() -> this.postGet(ctx, node,"/search/", specs, SearchResultSet.class).blockingFirst()
|
||||
);
|
||||
}
|
||||
|
||||
@CheckReturnValue
|
||||
public SearchResultSet query(Context ctx, List<Integer> nodes, SearchSpecification specs) {
|
||||
return Observable.fromIterable(nodes)
|
||||
.flatMap(node -> {
|
||||
try {
|
||||
return this
|
||||
.postGet(ctx, node, "/search/", specs, SearchResultSet.class).onErrorReturn(t -> new SearchResultSet())
|
||||
.observeOn(Schedulers.io());
|
||||
} catch (RouteNotConfiguredException ex) {
|
||||
return Observable.empty();
|
||||
}
|
||||
})
|
||||
.reduce(SearchResultSet::combine)
|
||||
.blockingGet();
|
||||
}
|
||||
|
||||
|
||||
@CheckReturnValue
|
||||
public Observable<Boolean> isBlocked(Context ctx, int node) {
|
||||
return super.get(ctx, node, "/is-blocked", Boolean.class);
|
||||
}
|
||||
|
||||
public long triggerRepartition(int node) throws Exception {
|
||||
return messageQueueFactory.sendSingleShotRequest(
|
||||
ServiceId.Index.withNode(node),
|
||||
IndexMqEndpoints.INDEX_REPARTITION,
|
||||
null
|
||||
);
|
||||
}
|
||||
|
||||
public long triggerRerank(int node) throws Exception {
|
||||
return messageQueueFactory.sendSingleShotRequest(
|
||||
ServiceId.Index.withNode(node),
|
||||
IndexMqEndpoints.INDEX_RERANK,
|
||||
null
|
||||
);
|
||||
}
|
||||
}
|
@ -1,204 +0,0 @@
|
||||
package nu.marginalia.query.client;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import gnu.trove.list.array.TIntArrayList;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.prometheus.client.Summary;
|
||||
import nu.marginalia.client.AbstractDynamicClient;
|
||||
import nu.marginalia.client.Context;
|
||||
import nu.marginalia.index.api.Empty;
|
||||
import nu.marginalia.index.api.IndexDomainLinksApiGrpc;
|
||||
import nu.marginalia.index.api.QueryApiGrpc;
|
||||
import nu.marginalia.index.api.RpcDomainId;
|
||||
import nu.marginalia.index.client.model.query.SearchSpecification;
|
||||
import nu.marginalia.index.client.model.results.SearchResultSet;
|
||||
import nu.marginalia.model.gson.GsonFactory;
|
||||
import nu.marginalia.query.QueryProtobufCodec;
|
||||
import nu.marginalia.query.model.QueryParams;
|
||||
import nu.marginalia.query.model.QueryResponse;
|
||||
import nu.marginalia.service.descriptor.ServiceDescriptor;
|
||||
import nu.marginalia.service.descriptor.ServiceDescriptors;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import org.roaringbitmap.PeekableCharIterator;
|
||||
import org.roaringbitmap.longlong.PeekableLongIterator;
|
||||
import org.roaringbitmap.longlong.Roaring64Bitmap;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import javax.annotation.CheckReturnValue;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
@Singleton
|
||||
public class QueryClient extends AbstractDynamicClient {
|
||||
|
||||
private static final Summary wmsa_qs_api_delegate_time = Summary.build()
|
||||
.name("wmsa_qs_api_delegate_time")
|
||||
.help("query service delegate time")
|
||||
.register();
|
||||
private static final Summary wmsa_qs_api_search_time = Summary.build()
|
||||
.name("wmsa_qs_api_search_time")
|
||||
.help("query service search time")
|
||||
.register();
|
||||
|
||||
private final Map<ServiceAndNode, ManagedChannel> channels = new ConcurrentHashMap<>();
|
||||
private final Map<ServiceAndNode, QueryApiGrpc.QueryApiBlockingStub > queryIndexApis = new ConcurrentHashMap<>();
|
||||
private final Map<ServiceAndNode, IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub> domainLinkApis = new ConcurrentHashMap<>();
|
||||
|
||||
record ServiceAndNode(String service, int node) {
|
||||
public String getHostName() {
|
||||
return service;
|
||||
}
|
||||
}
|
||||
|
||||
private ManagedChannel getChannel(ServiceAndNode serviceAndNode) {
|
||||
return channels.computeIfAbsent(serviceAndNode,
|
||||
san -> ManagedChannelBuilder
|
||||
.forAddress(serviceAndNode.getHostName(), 81)
|
||||
.usePlaintext()
|
||||
.build());
|
||||
}
|
||||
|
||||
public QueryApiGrpc.QueryApiBlockingStub queryApi(int node) {
|
||||
return queryIndexApis.computeIfAbsent(new ServiceAndNode("query-service", node), n ->
|
||||
QueryApiGrpc.newBlockingStub(
|
||||
getChannel(n)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
public IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub domainApi(int node) {
|
||||
return domainLinkApis.computeIfAbsent(new ServiceAndNode("query-service", node), n ->
|
||||
IndexDomainLinksApiGrpc.newBlockingStub(
|
||||
getChannel(n)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
|
||||
@Inject
|
||||
public QueryClient(ServiceDescriptors descriptors) {
|
||||
|
||||
super(descriptors.forId(ServiceId.Query), GsonFactory::get);
|
||||
}
|
||||
public QueryClient() {
|
||||
super(new ServiceDescriptor(ServiceId.Query, "query-service"), GsonFactory::get);
|
||||
}
|
||||
|
||||
/** Delegate an Index API style query directly to the index service */
|
||||
@CheckReturnValue
|
||||
public SearchResultSet delegate(Context ctx, SearchSpecification specs) {
|
||||
return wmsa_qs_api_delegate_time.time(
|
||||
() -> this.postGet(ctx, 0, "/delegate/", specs, SearchResultSet.class).blockingFirst()
|
||||
);
|
||||
}
|
||||
|
||||
@CheckReturnValue
|
||||
public QueryResponse search(Context ctx, QueryParams params) {
|
||||
return wmsa_qs_api_search_time.time(
|
||||
() -> QueryProtobufCodec.convertQueryResponse(queryApi(0).query(QueryProtobufCodec.convertQueryParams(params)))
|
||||
);
|
||||
}
|
||||
|
||||
public AllLinks getAllDomainLinks() {
|
||||
AllLinks links = new AllLinks();
|
||||
|
||||
domainApi(0).getAllLinks(Empty.newBuilder().build()).forEachRemaining(pairs -> {
|
||||
for (int i = 0; i < pairs.getDestIdsCount(); i++) {
|
||||
links.add(pairs.getSourceIds(i), pairs.getDestIds(i));
|
||||
}
|
||||
});
|
||||
|
||||
return links;
|
||||
}
|
||||
|
||||
public List<Integer> getLinksToDomain(int domainId) {
|
||||
try {
|
||||
return domainApi(0).getLinksToDomain(RpcDomainId
|
||||
.newBuilder()
|
||||
.setDomainId(domainId)
|
||||
.build())
|
||||
.getDomainIdList();
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("API Exception", e);
|
||||
return List.of();
|
||||
}
|
||||
}
|
||||
|
||||
public List<Integer> getLinksFromDomain(int domainId) {
|
||||
try {
|
||||
return domainApi(0).getLinksFromDomain(RpcDomainId
|
||||
.newBuilder()
|
||||
.setDomainId(domainId)
|
||||
.build())
|
||||
.getDomainIdList();
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("API Exception", e);
|
||||
return List.of();
|
||||
}
|
||||
}
|
||||
|
||||
public int countLinksToDomain(int domainId) {
|
||||
try {
|
||||
return domainApi(0).countLinksToDomain(RpcDomainId
|
||||
.newBuilder()
|
||||
.setDomainId(domainId)
|
||||
.build())
|
||||
.getIdCount();
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("API Exception", e);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
public int countLinksFromDomain(int domainId) {
|
||||
try {
|
||||
return domainApi(0).countLinksFromDomain(RpcDomainId
|
||||
.newBuilder()
|
||||
.setDomainId(domainId)
|
||||
.build())
|
||||
.getIdCount();
|
||||
}
|
||||
catch (Exception e) {
|
||||
logger.error("API Exception", e);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
public static class AllLinks {
|
||||
private final Roaring64Bitmap sourceToDest = new Roaring64Bitmap();
|
||||
|
||||
public void add(int source, int dest) {
|
||||
sourceToDest.add(Integer.toUnsignedLong(source) << 32 | Integer.toUnsignedLong(dest));
|
||||
}
|
||||
|
||||
public Iterator iterator() {
|
||||
return new Iterator();
|
||||
}
|
||||
|
||||
public class Iterator {
|
||||
private final PeekableLongIterator base = sourceToDest.getLongIterator();
|
||||
long val = Long.MIN_VALUE;
|
||||
|
||||
public boolean advance() {
|
||||
if (base.hasNext()) {
|
||||
val = base.next();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
public int source() {
|
||||
return (int) (val >>> 32);
|
||||
}
|
||||
public int dest() {
|
||||
return (int) (val & 0xFFFF_FFFFL);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,23 +0,0 @@
|
||||
# Clients
|
||||
|
||||
## Core Services
|
||||
|
||||
* [assistant-api](assistant-api/)
|
||||
* [query-api](query-api/)
|
||||
* [index-api](index-api/)
|
||||
|
||||
These are clients for the [core services](../services-core/), along with what models
|
||||
are necessary for speaking to them. They each implement the abstract client classes from
|
||||
[service-client](../common/service-client).
|
||||
|
||||
All that is necessary is to `@Inject` them into the constructor and then
|
||||
requests can be sent.
|
||||
|
||||
**Note:** If you are looking for the public API, it's handled by the api service in [services-application/api-service](../services-application/api-service).
|
||||
|
||||
## MQ-API Process API
|
||||
|
||||
[process-mqapi](process-mqapi/) defines requests and inboxes for the message queue based API used
|
||||
for interacting with processes.
|
||||
|
||||
See [libraries/message-queue](../libraries/message-queue) and [services-application/control-service](../services-core/control-service).
|
@ -11,9 +11,10 @@ java {
|
||||
}
|
||||
}
|
||||
|
||||
apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:common:service-client')
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:common:model')
|
||||
|
||||
|
@ -3,11 +3,10 @@ package nu.marginalia;
|
||||
|
||||
import nu.marginalia.service.ServiceHomeNotConfiguredException;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
@ -28,14 +27,37 @@ public class WmsaHome {
|
||||
}
|
||||
|
||||
public static Path getHomePath() {
|
||||
var retStr = Optional.ofNullable(System.getenv("WMSA_HOME")).orElseGet(WmsaHome::findDefaultHomePath);
|
||||
String[] possibleLocations = new String[] {
|
||||
System.getenv("WMSA_HOME"),
|
||||
System.getProperty("system.homePath"),
|
||||
"/var/lib/wmsa",
|
||||
"/wmsa"
|
||||
};
|
||||
|
||||
var ret = Path.of(retStr);
|
||||
Optional<String> retStr = Stream.of(possibleLocations)
|
||||
.filter(Objects::nonNull)
|
||||
.map(Path::of)
|
||||
.filter(Files::isDirectory)
|
||||
.map(Path::toString)
|
||||
.findFirst();
|
||||
|
||||
if (!Files.isDirectory(ret)) {
|
||||
throw new ServiceHomeNotConfiguredException("Could not find $WMSA_HOME, either set environment variable or ensure " + retStr + " exists");
|
||||
if (retStr.isEmpty()) {
|
||||
// Check if we are running in a test environment
|
||||
|
||||
var testRoot = Stream.iterate(Paths.get("").toAbsolutePath(), f -> f != null && Files.exists(f), Path::getParent)
|
||||
.filter(p -> Files.exists(p.resolve("run/env")))
|
||||
.filter(p -> Files.exists(p.resolve("run/setup.sh")))
|
||||
.map(p -> p.resolve("run"))
|
||||
.findAny();
|
||||
|
||||
return testRoot.orElseThrow(() -> new ServiceHomeNotConfiguredException("""
|
||||
Could not find $WMSA_HOME, either set environment
|
||||
variable, the 'system.homePath' property,
|
||||
or ensure either /wmssa or /var/lib/wmsa exists
|
||||
"""));
|
||||
}
|
||||
|
||||
var ret = Path.of(retStr.get());
|
||||
|
||||
if (!Files.isDirectory(ret.resolve("model"))) {
|
||||
throw new ServiceHomeNotConfiguredException("You need to run 'run/setup.sh' to download models to run/ before this will work!");
|
||||
@ -44,22 +66,6 @@ public class WmsaHome {
|
||||
return ret;
|
||||
}
|
||||
|
||||
private static String findDefaultHomePath() {
|
||||
|
||||
// Assume this is a local developer and not a production system, since it would have WMSA_HOME set.
|
||||
// Developers probably have a "run/" somewhere upstream from cwd.
|
||||
//
|
||||
|
||||
return Stream.iterate(Paths.get("").toAbsolutePath(), f -> f != null && Files.exists(f), Path::getParent)
|
||||
.filter(p -> Files.exists(p.resolve("run/env")))
|
||||
.filter(p -> Files.exists(p.resolve("run/setup.sh")))
|
||||
.map(p -> p.resolve("run"))
|
||||
.findAny()
|
||||
.orElse(Path.of("/var/lib/wmsa"))
|
||||
.toString();
|
||||
}
|
||||
|
||||
|
||||
public static Path getAdsDefinition() {
|
||||
return getHomePath().resolve("data").resolve("adblock.txt");
|
||||
}
|
@ -2,7 +2,6 @@ package nu.marginalia.nodecfg;
|
||||
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.test.TestMigrationLoader;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Tag;
|
||||
@ -13,12 +12,7 @@ import org.testcontainers.containers.MariaDBContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
@ -26,6 +26,8 @@ configurations {
|
||||
flywayMigration.extendsFrom(implementation)
|
||||
}
|
||||
|
||||
apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation project(':code:common:model')
|
||||
|
||||
@ -40,7 +42,6 @@ dependencies {
|
||||
|
||||
implementation libs.trove
|
||||
|
||||
implementation libs.rxjava
|
||||
implementation libs.bundles.mariadb
|
||||
flywayMigration 'org.flywaydb:flyway-mysql:10.0.1'
|
||||
|
||||
|
@ -115,7 +115,6 @@ public class DomainRankingSetsService {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/** Defines a domain ranking set, parameters for the ranking algorithms.
|
||||
*
|
||||
* @param name Key and name of the set
|
@ -24,7 +24,7 @@ public class DomainTypes {
|
||||
BLOG,
|
||||
CRAWL,
|
||||
TEST
|
||||
};
|
||||
}
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(DomainTypes.class);
|
||||
|
@ -17,14 +17,14 @@ It's well documented and these are probably the only four tasks you'll ever need
|
||||
If you are not running the system via docker, you need to provide alternative connection details than
|
||||
the defaults (TODO: how?).
|
||||
|
||||
The migration files are in [resources/db/migration](src/main/resources/db/migration). The file name convention
|
||||
The migration files are in [resources/db/migration](resources/db/migration). The file name convention
|
||||
incorporates the project's cal-ver versioning; and are applied in lexicographical order.
|
||||
|
||||
VYY_MM_v_nnn__description.sql
|
||||
|
||||
## Central Paths
|
||||
|
||||
* [migrations](src/main/resources/db/migration) - Flyway migrations
|
||||
* [migrations](resources/db/migration) - Flyway migrations
|
||||
|
||||
## See Also
|
||||
|
||||
|
@ -14,6 +14,8 @@ configurations {
|
||||
flywayMigration.extendsFrom(implementation)
|
||||
}
|
||||
|
||||
apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:service')
|
||||
|
@ -1,15 +1,3 @@
|
||||
## Domain Link Database
|
||||
|
||||
The domain link database contains information about links
|
||||
between domains. It is a static in-memory database loaded
|
||||
from a binary file.
|
||||
|
||||
* [DomainLinkDb](src/main/java/nu/marginalia/linkdb/DomainLinkDb.java)
|
||||
* * [FileDomainLinkDb](src/main/java/nu/marginalia/linkdb/FileDomainLinkDb.java)
|
||||
* * [SqlDomainLinkDb](src/main/java/nu/marginalia/linkdb/SqlDomainLinkDb.java)
|
||||
* [DomainLinkDbWriter](src/main/java/nu/marginalia/linkdb/DomainLinkDbWriter.java)
|
||||
* [DomainLinkDbLoader](src/main/java/nu/marginalia/linkdb/DomainLinkDbLoader.java)
|
||||
|
||||
## Document Database
|
||||
|
||||
The document database contains information about links,
|
||||
@ -21,10 +9,10 @@ is not in the MariaDB database is that this would make updates to
|
||||
this information take effect in production immediately, even before
|
||||
the information was searchable.
|
||||
|
||||
* [DocumentLinkDbWriter](src/main/java/nu/marginalia/linkdb/DocumentDbWriter.java)
|
||||
* [DocumentLinkDbLoader](src/main/java/nu/marginalia/linkdb/DocumentDbReader.java)
|
||||
* [DocumentLinkDbWriter](java/nu/marginalia/linkdb/docs/DocumentDbWriter.java)
|
||||
* [DocumentLinkDbLoader](java/nu/marginalia/linkdb/docs/DocumentDbReader.java)
|
||||
|
||||
|
||||
## See Also
|
||||
|
||||
These databases are constructed by the [loading-process](../../processes/loading-process), and consumed by the [index-service](../../services-core/index-service).
|
||||
The database is constructed by the [loading-process](../../processes/loading-process), and consumed by the [index-service](../../services-core/index-service).
|
@ -10,9 +10,10 @@ java {
|
||||
}
|
||||
}
|
||||
|
||||
apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:common:service-client')
|
||||
implementation project(':code:libraries:big-string')
|
||||
implementation project(':code:libraries:braille-block-punch-cards')
|
||||
|
||||
@ -28,7 +29,6 @@ dependencies {
|
||||
implementation libs.trove
|
||||
implementation libs.fastutil
|
||||
|
||||
implementation libs.rxjava
|
||||
implementation libs.bundles.mariadb
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
|
@ -127,7 +127,7 @@ public class EdgeDomain implements Serializable {
|
||||
ret.append(topDomain, 0, cutPoint);
|
||||
}
|
||||
|
||||
if (!"".equals(subDomain) && !"www".equals(subDomain)) {
|
||||
if (!subDomain.isEmpty() && !"www".equals(subDomain)) {
|
||||
ret.append(":");
|
||||
ret.append(subDomain);
|
||||
}
|
@ -32,7 +32,16 @@ public class EdgeUrl implements Serializable {
|
||||
}
|
||||
|
||||
public EdgeUrl(String url) throws URISyntaxException {
|
||||
this(new URI(urlencodeFixer(url)));
|
||||
this(parseURI(url));
|
||||
}
|
||||
|
||||
private static URI parseURI(String url) throws URISyntaxException {
|
||||
try {
|
||||
return new URI(urlencodeFixer(url));
|
||||
}
|
||||
catch (URISyntaxException ex) {
|
||||
throw new URISyntaxException(STR."Failed to parse URI '\{url}'", ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
public static Optional<EdgeUrl> parse(@Nullable String url) {
|
@ -5,6 +5,6 @@ public enum UrlIndexingState {
|
||||
OK,
|
||||
REDIRECT,
|
||||
DEAD,
|
||||
DISQUALIFIED;
|
||||
DISQUALIFIED
|
||||
|
||||
}
|
@ -4,9 +4,9 @@ This package contains common models to the search engine
|
||||
|
||||
## Central Classes
|
||||
|
||||
* [EdgeDomain](src/main/java/nu/marginalia/model/EdgeDomain.java)
|
||||
* [EdgeUrl](src/main/java/nu/marginalia/model/EdgeUrl.java)
|
||||
* [DocumentMetadata](src/main/java/nu/marginalia/model/idx/DocumentMetadata.java)
|
||||
* [DocumentFlags](src/main/java/nu/marginalia/model/idx/DocumentFlags.java)
|
||||
* [WordMetadata](src/main/java/nu/marginalia/model/idx/WordMetadata.java)
|
||||
* [WordFlags](src/main/java/nu/marginalia/model/idx/WordFlags.java)
|
||||
* [EdgeDomain](java/nu/marginalia/model/EdgeDomain.java)
|
||||
* [EdgeUrl](java/nu/marginalia/model/EdgeUrl.java)
|
||||
* [DocumentMetadata](java/nu/marginalia/model/idx/DocumentMetadata.java)
|
||||
* [DocumentFlags](java/nu/marginalia/model/idx/DocumentFlags.java)
|
||||
* [WordMetadata](java/nu/marginalia/model/idx/WordMetadata.java)
|
||||
* [WordFlags](java/nu/marginalia/model/idx/WordFlags.java)
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in New Issue
Block a user