(refac) Merge service-discovery and service modules
Also adds a few tests to the server/client code.
This commit is contained in:
parent
29bf473d74
commit
46423612e3
@ -14,7 +14,6 @@ java {
|
||||
apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:common:model')
|
||||
|
||||
|
@ -1,8 +1,6 @@
|
||||
package nu.marginalia;
|
||||
|
||||
|
||||
import nu.marginalia.service.ServiceHomeNotConfiguredException;
|
||||
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.Paths;
|
||||
@ -50,7 +48,7 @@ public class WmsaHome {
|
||||
.map(p -> p.resolve("run"))
|
||||
.findAny();
|
||||
|
||||
return testRoot.orElseThrow(() -> new ServiceHomeNotConfiguredException("""
|
||||
return testRoot.orElseThrow(() -> new IllegalStateException("""
|
||||
Could not find $WMSA_HOME, either set environment
|
||||
variable, the 'system.homePath' property,
|
||||
or ensure either /wmssa or /var/lib/wmsa exists
|
||||
@ -60,7 +58,7 @@ public class WmsaHome {
|
||||
var ret = Path.of(retStr.get());
|
||||
|
||||
if (!Files.isDirectory(ret.resolve("model"))) {
|
||||
throw new ServiceHomeNotConfiguredException("You need to run 'run/setup.sh' to download models to run/ before this will work!");
|
||||
throw new IllegalStateException("You need to run 'run/setup.sh' to download models to run/ before this will work!");
|
||||
}
|
||||
|
||||
return ret;
|
||||
|
@ -13,7 +13,6 @@ java {
|
||||
apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:libraries:big-string')
|
||||
implementation project(':code:libraries:braille-block-punch-cards')
|
||||
|
||||
|
@ -6,6 +6,6 @@ as shared models.
|
||||
* [db](db/) contains SQL code and some database-related utilities.
|
||||
* [config](config/) contains some `@Inject`ables.
|
||||
* [renderer](renderer/) contains utility code for rendering website templates.
|
||||
* [service](service/) is the shared base classes for main methods and web services.
|
||||
* [service-discovery](service-discovery) contains tools that lets the services find each other and communicate.
|
||||
* [service](service/) is the shared base classes for main methods and web services,
|
||||
including a service registry interface.
|
||||
* [process](process/) contains boiler plate for batch processes.
|
||||
|
@ -1,41 +0,0 @@
|
||||
plugins {
|
||||
id 'java'
|
||||
|
||||
}
|
||||
|
||||
repositories {
|
||||
mavenLocal()
|
||||
mavenCentral()
|
||||
|
||||
repositories {
|
||||
mavenCentral()
|
||||
maven { url 'https://jitpack.io' }
|
||||
}
|
||||
}
|
||||
|
||||
java {
|
||||
toolchain {
|
||||
languageVersion.set(JavaLanguageVersion.of(21))
|
||||
}
|
||||
}
|
||||
apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation libs.bundles.slf4j
|
||||
|
||||
implementation libs.bundles.curator
|
||||
implementation libs.guice
|
||||
implementation libs.bundles.gson
|
||||
implementation libs.bundles.mariadb
|
||||
implementation libs.bundles.grpc
|
||||
implementation libs.notnull
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
testImplementation libs.bundles.junit
|
||||
testImplementation libs.mockito
|
||||
|
||||
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
|
||||
testImplementation 'org.testcontainers:mariadb:1.17.4'
|
||||
testImplementation 'org.testcontainers:junit-jupiter:1.17.4'
|
||||
testImplementation project(':code:functions:math:api')
|
||||
}
|
@ -1,149 +0,0 @@
|
||||
# Service Discovery
|
||||
|
||||
Contains classes for helping services discover each other,
|
||||
and managing connections between them.
|
||||
|
||||
## Service Registry
|
||||
|
||||
The service registry is a class that keeps track of the services
|
||||
that are currently running, and their connection information.
|
||||
|
||||
The service register implementation is based on [Zookeeper](https://zookeeper.apache.org/),
|
||||
which is a distributed coordination service. This lets services register
|
||||
themselves and announce their liveness, and then discover each other.
|
||||
|
||||
It supports multiple instances of a service running, and
|
||||
supports running the system bare-metal, where it will assign
|
||||
ports to the services from a range.
|
||||
|
||||
* REST services are registered on a per-node basis, and are always non-partitioned.
|
||||
* gRPC services are registered on a per-api basis, and can be partitioned
|
||||
or non-partitioned. This means that if a gRPC api is moved between nodes,
|
||||
the clients will not need to be reconfigured.
|
||||
|
||||
To be discoverable, the caller must first register their
|
||||
services:
|
||||
|
||||
```java
|
||||
// Register one or more services
|
||||
serviceRegistry.registerService(
|
||||
ServiceKey.forRest(serviceId, nodeId),
|
||||
instanceUuid, // unique
|
||||
externalAddress); // bind-address
|
||||
|
||||
// Non-partitioned GRPC service
|
||||
serviceRegistry.registerService(
|
||||
ServiceKey.forServiceDescriptor(descriptor, ServicePartition.any()),
|
||||
instanceUuid,
|
||||
externalAddress);
|
||||
|
||||
// Partitioned GRPC service
|
||||
serviceRegistry.registerService(
|
||||
ServiceKey.forServiceDescriptor(descriptor, ServicePartition.partition(5)),
|
||||
instanceUuid,
|
||||
externalAddress);
|
||||
|
||||
// (+ any other services)
|
||||
```
|
||||
|
||||
Then, the caller must announce their instance. Before this is done,
|
||||
the service is not discoverable.
|
||||
|
||||
```java
|
||||
registry.announceInstance(instanceUUID);
|
||||
```
|
||||
|
||||
All of this is done automatically by the `Service` base class
|
||||
in the [service](../service/) module.
|
||||
|
||||
To discover a service, the caller can query the registry:
|
||||
|
||||
```java
|
||||
Set<InstanceAddress> endpoints = registry.getEndpoints(serviceKey);
|
||||
```
|
||||
|
||||
It's also possible to subscribe to changes in the registry, so that
|
||||
the caller can be notified when a service comes or goes, with `registry.registerMonitor()`.
|
||||
|
||||
However the `GrpcChannelPoolFactory` is a more convenient way to access the services,
|
||||
it will let the caller create a pool of channels to the services, and manage their
|
||||
lifecycle, listen to lifecycle notifications and so on.
|
||||
|
||||
## gRPC Channel Pool
|
||||
|
||||
From the [GrpcChannelPoolFactory](java/nu/marginalia/service/client/GrpcChannelPoolFactory.java), two types of channel pools can be created
|
||||
that are aware of the service registry:
|
||||
|
||||
* [GrpcMultiNodeChannelPool](java/nu/marginalia/service/client/GrpcMultiNodeChannelPool.java) - This pool permits 1-n style communication with partitioned services
|
||||
* [GrpcSingleNodeChannelPool](java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java) - This pool permits 1-1 style communication with non-partitioned services.
|
||||
if multiple instances are running, it will use one of them and fall back
|
||||
to another if the first is not available.
|
||||
|
||||
The pools can generate calls to the gRPC services, and will manage the lifecycle of the channels.
|
||||
|
||||
The API is designed to be simple to use, and will permit the caller to access the Stub interfaces
|
||||
for the services through a fluent API.
|
||||
|
||||
### Example Usage of the GrpcSingleNodeChannelPool
|
||||
|
||||
```java
|
||||
// create a pool for a non-partitioned service
|
||||
channelPool = factory.createSingle(
|
||||
ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.any()),
|
||||
MathApiGrpc::newBlockingStub);
|
||||
|
||||
// blocking call
|
||||
Response response = channelPool
|
||||
.call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup)
|
||||
.run(request);
|
||||
|
||||
// sequential blocking calls
|
||||
List<Response> response = channelPool
|
||||
.call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup)
|
||||
.runFor(request1, request2);
|
||||
|
||||
|
||||
// async call
|
||||
Future<Response> response = channelPool
|
||||
.call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup)
|
||||
.async(myExecutor)
|
||||
.run(request);
|
||||
|
||||
// multiple async calls
|
||||
Future<List<Response>> response = channelPool
|
||||
.call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup)
|
||||
.async(myExecutor)
|
||||
.runFor(request1, request2);
|
||||
```
|
||||
|
||||
### Example Usage of the GrpcSingleNodeChannelPool
|
||||
|
||||
```java
|
||||
// create a pool for a partitioned service
|
||||
channelPool = factory.createMulti(
|
||||
ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.multi()),
|
||||
MathApiGrpc::newBlockingStub);
|
||||
|
||||
// blocking call
|
||||
List<Response> response = channelPool
|
||||
.call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup)
|
||||
.run(request);
|
||||
|
||||
// async call
|
||||
Future<List<Response>> response = channelPool
|
||||
.call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup)
|
||||
.async(myExecutor)
|
||||
.runEach(request);
|
||||
|
||||
// async call, will fail or succeed as a group
|
||||
Future<List<Response>> response = channelPool
|
||||
.call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup)
|
||||
.async(myExecutor)
|
||||
.runAll(request1, request2);
|
||||
```
|
||||
|
||||
|
||||
### Central Classes
|
||||
|
||||
* [ServiceRegistryIf](java/nu/marginalia/service/discovery/ServiceRegistryIf.java)
|
||||
* [ZkServiceRegistry](java/nu/marginalia/service/discovery/ZkServiceRegistry.java)
|
@ -12,7 +12,6 @@ java {
|
||||
apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:libraries:message-queue')
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:common:config')
|
||||
@ -25,11 +24,18 @@ dependencies {
|
||||
implementation libs.bundles.prometheus
|
||||
implementation libs.bundles.slf4j
|
||||
implementation libs.bucket4j
|
||||
implementation libs.notnull
|
||||
implementation libs.bundles.curator
|
||||
implementation libs.bundles.flyway
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
implementation libs.bundles.mariadb
|
||||
|
||||
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
|
||||
testImplementation 'org.testcontainers:mariadb:1.17.4'
|
||||
testImplementation 'org.testcontainers:junit-jupiter:1.17.4'
|
||||
testImplementation project(':code:libraries:test-helpers')
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
testImplementation libs.bundles.junit
|
||||
testImplementation libs.mockito
|
||||
|
@ -1,7 +1,6 @@
|
||||
package nu.marginalia.service;
|
||||
|
||||
import io.prometheus.client.hotspot.DefaultExports;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
package nu.marginalia.service.id;
|
||||
package nu.marginalia.service;
|
||||
|
||||
public enum ServiceId {
|
||||
|
@ -3,7 +3,6 @@ package nu.marginalia.service;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -4,7 +4,7 @@ import com.google.inject.Inject;
|
||||
import com.google.inject.Singleton;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import nu.marginalia.service.NamedExecutorFactory;
|
||||
import nu.marginalia.util.NamedExecutorFactory;
|
||||
import nu.marginalia.service.NodeConfigurationWatcher;
|
||||
import nu.marginalia.service.discovery.ServiceRegistryIf;
|
||||
import nu.marginalia.service.discovery.property.PartitionTraits;
|
||||
@ -57,6 +57,7 @@ public class GrpcChannelPoolFactory {
|
||||
.forAddress(route.host(), route.port())
|
||||
.executor(executor)
|
||||
.offloadExecutor(offloadExecutor)
|
||||
.idleTimeout(120, java.util.concurrent.TimeUnit.SECONDS)
|
||||
.usePlaintext()
|
||||
.build();
|
||||
|
@ -74,6 +74,14 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
|
||||
return true;
|
||||
}
|
||||
|
||||
// Mostly for testing
|
||||
public synchronized void stop() {
|
||||
for (var channel : channels.values()) {
|
||||
channel.closeHard();
|
||||
}
|
||||
channels.clear();
|
||||
}
|
||||
|
||||
private class ConnectionHolder implements Comparable<ConnectionHolder> {
|
||||
private final AtomicReference<ManagedChannel> channel = new AtomicReference<>();
|
||||
private final InstanceAddress address;
|
||||
@ -116,6 +124,12 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
|
||||
mc.shutdown();
|
||||
}
|
||||
}
|
||||
public void closeHard() {
|
||||
ManagedChannel mc = channel.getAndSet(null);
|
||||
if (mc != null) {
|
||||
mc.shutdownNow();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object o) {
|
@ -1,7 +1,7 @@
|
||||
package nu.marginalia.service.discovery.property;
|
||||
|
||||
import io.grpc.ServiceDescriptor;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
|
||||
public sealed interface ServiceKey<P extends ServicePartition> {
|
||||
String toPath();
|
@ -1,6 +1,6 @@
|
||||
package nu.marginalia.service.module;
|
||||
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
|
@ -2,7 +2,7 @@ package nu.marginalia.service.module;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import com.google.inject.name.Names;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
package nu.marginalia.service;
|
||||
package nu.marginalia.service.module;
|
||||
|
||||
import com.google.inject.AbstractModule;
|
||||
import nu.marginalia.service.discovery.ServiceRegistryIf;
|
||||
@ -9,8 +9,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
/** Provides a Guice module for service discovery. */
|
||||
public class ServiceDiscoveryModule extends AbstractModule {
|
||||
|
@ -0,0 +1,59 @@
|
||||
package nu.marginalia.service.server;
|
||||
|
||||
import io.grpc.BindableService;
|
||||
import io.grpc.Server;
|
||||
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
|
||||
import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import nu.marginalia.service.discovery.ServiceRegistryIf;
|
||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||
import nu.marginalia.service.discovery.property.ServicePartition;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import nu.marginalia.util.NamedExecutorFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
public class GrpcServer {
|
||||
private final Server server;
|
||||
public GrpcServer(ServiceConfiguration config,
|
||||
ServiceRegistryIf serviceRegistry,
|
||||
ServicePartition partition,
|
||||
List<BindableService> grpcServices) throws Exception {
|
||||
|
||||
int port = serviceRegistry.requestPort(config.externalAddress(), new ServiceKey.Grpc<>("-", partition));
|
||||
|
||||
int nThreads = Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 16);
|
||||
|
||||
// Start the gRPC server
|
||||
var grpcServerBuilder = NettyServerBuilder.forAddress(new InetSocketAddress(config.bindAddress(), port))
|
||||
.executor(NamedExecutorFactory.createFixed("nettyExecutor", nThreads))
|
||||
.workerEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Worker-ELG", nThreads)))
|
||||
.bossEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Boss-ELG", nThreads)))
|
||||
.channelType(NioServerSocketChannel.class);
|
||||
|
||||
for (var grpcService : grpcServices) {
|
||||
var svc = grpcService.bindService();
|
||||
|
||||
serviceRegistry.registerService(
|
||||
ServiceKey.forServiceDescriptor(svc.getServiceDescriptor(), partition),
|
||||
config.instanceUuid(),
|
||||
config.externalAddress()
|
||||
);
|
||||
|
||||
grpcServerBuilder.addService(svc);
|
||||
}
|
||||
server = grpcServerBuilder.build();
|
||||
}
|
||||
|
||||
public void start() throws IOException {
|
||||
server.start();
|
||||
}
|
||||
|
||||
public void stop() throws InterruptedException {
|
||||
server.shutdownNow();
|
||||
server.awaitTermination();
|
||||
}
|
||||
|
||||
}
|
@ -7,10 +7,10 @@ import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import io.prometheus.client.Counter;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.mq.inbox.*;
|
||||
import nu.marginalia.service.NamedExecutorFactory;
|
||||
import nu.marginalia.util.NamedExecutorFactory;
|
||||
import nu.marginalia.service.client.ServiceNotAvailableException;
|
||||
import nu.marginalia.service.discovery.property.*;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.server.mq.ServiceMqSubscription;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -23,11 +23,6 @@ import spark.Spark;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
public class Service {
|
||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||
@ -54,6 +49,7 @@ public class Service {
|
||||
|
||||
protected final MqInboxIf messageQueueInbox;
|
||||
private final int node;
|
||||
private GrpcServer grpcServer;
|
||||
|
||||
@SneakyThrows
|
||||
public Service(BaseServiceParams params,
|
||||
@ -134,30 +130,8 @@ public class Service {
|
||||
Spark.get("/internal/started", this::isInitialized);
|
||||
Spark.get("/internal/ready", this::isReady);
|
||||
|
||||
int port = params.serviceRegistry.requestPort(config.externalAddress(), new ServiceKey.Grpc<>("-", partition));
|
||||
|
||||
|
||||
int nThreads = Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 16);
|
||||
|
||||
// Start the gRPC server
|
||||
var grpcServerBuilder = NettyServerBuilder.forAddress(new InetSocketAddress(config.bindAddress(), port))
|
||||
.executor(NamedExecutorFactory.createFixed("nettyExecutor", nThreads))
|
||||
.workerEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Worker-ELG", nThreads)))
|
||||
.bossEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Boss-ELG", nThreads)))
|
||||
.channelType(NioServerSocketChannel.class);
|
||||
|
||||
for (var grpcService : grpcServices) {
|
||||
var svc = grpcService.bindService();
|
||||
|
||||
params.serviceRegistry.registerService(
|
||||
ServiceKey.forServiceDescriptor(svc.getServiceDescriptor(), partition),
|
||||
config.instanceUuid(),
|
||||
config.externalAddress()
|
||||
);
|
||||
|
||||
grpcServerBuilder.addService(svc);
|
||||
}
|
||||
grpcServerBuilder.build().start();
|
||||
grpcServer = new GrpcServer(config, serviceRegistry, partition, grpcServices);
|
||||
grpcServer.start();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,4 +1,4 @@
|
||||
package nu.marginalia.service;
|
||||
package nu.marginalia.util;
|
||||
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
@ -1,4 +1,4 @@
|
||||
package nu.marginalia.service;
|
||||
package nu.marginalia.util;
|
||||
|
||||
public class ServiceHomeNotConfiguredException extends RuntimeException {
|
||||
public ServiceHomeNotConfiguredException(String message) {
|
@ -55,4 +55,156 @@ The service should also be given a canonical name in the `ServiceId` enum.
|
||||
## Central Classes
|
||||
|
||||
* [MainClass](java/nu/marginalia/service/MainClass.java) bootstraps all executables
|
||||
* [Service](java/nu/marginalia/service/server/Service.java) base class for all services.
|
||||
* [Service](java/nu/marginalia/service/server/Service.java) base class for all services.
|
||||
|
||||
---
|
||||
|
||||
# Service Discovery
|
||||
|
||||
The module also contains classes for helping services discover each other,
|
||||
and managing connections between them.
|
||||
|
||||
## Service Registry
|
||||
|
||||
The service registry is a class that keeps track of the services
|
||||
that are currently running, and their connection information.
|
||||
|
||||
The service register implementation is based on [Zookeeper](https://zookeeper.apache.org/),
|
||||
which is a distributed coordination service. This lets services register
|
||||
themselves and announce their liveness, and then discover each other.
|
||||
|
||||
It supports multiple instances of a service running, and
|
||||
supports running the system bare-metal, where it will assign
|
||||
ports to the services from a range.
|
||||
|
||||
* REST services are registered on a per-node basis, and are always non-partitioned.
|
||||
* gRPC services are registered on a per-api basis, and can be partitioned
|
||||
or non-partitioned. This means that if a gRPC api is moved between nodes,
|
||||
the clients will not need to be reconfigured.
|
||||
|
||||
To be discoverable, the caller must first register their
|
||||
services:
|
||||
|
||||
```java
|
||||
// Register one or more services
|
||||
serviceRegistry.registerService(
|
||||
ServiceKey.forRest(serviceId, nodeId),
|
||||
instanceUuid, // unique
|
||||
externalAddress); // bind-address
|
||||
|
||||
// Non-partitioned GRPC service
|
||||
serviceRegistry.registerService(
|
||||
ServiceKey.forServiceDescriptor(descriptor, ServicePartition.any()),
|
||||
instanceUuid,
|
||||
externalAddress);
|
||||
|
||||
// Partitioned GRPC service
|
||||
serviceRegistry.registerService(
|
||||
ServiceKey.forServiceDescriptor(descriptor, ServicePartition.partition(5)),
|
||||
instanceUuid,
|
||||
externalAddress);
|
||||
|
||||
// (+ any other services)
|
||||
```
|
||||
|
||||
Then, the caller must announce their instance. Before this is done,
|
||||
the service is not discoverable.
|
||||
|
||||
```java
|
||||
registry.announceInstance(instanceUUID);
|
||||
```
|
||||
|
||||
All of this is done automatically by the `Service` base class
|
||||
in the [service](../service/) module.
|
||||
|
||||
To discover a service, the caller can query the registry:
|
||||
|
||||
```java
|
||||
Set<InstanceAddress> endpoints = registry.getEndpoints(serviceKey);
|
||||
```
|
||||
|
||||
It's also possible to subscribe to changes in the registry, so that
|
||||
the caller can be notified when a service comes or goes, with `registry.registerMonitor()`.
|
||||
|
||||
However the `GrpcChannelPoolFactory` is a more convenient way to access the services,
|
||||
it will let the caller create a pool of channels to the services, and manage their
|
||||
lifecycle, listen to lifecycle notifications and so on.
|
||||
|
||||
## gRPC Channel Pool
|
||||
|
||||
From the [GrpcChannelPoolFactory](java/nu/marginalia/service/client/GrpcChannelPoolFactory.java), two types of channel pools can be created
|
||||
that are aware of the service registry:
|
||||
|
||||
* [GrpcMultiNodeChannelPool](java/nu/marginalia/service/client/GrpcMultiNodeChannelPool.java) - This pool permits 1-n style communication with partitioned services
|
||||
* [GrpcSingleNodeChannelPool](java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java) - This pool permits 1-1 style communication with non-partitioned services.
|
||||
if multiple instances are running, it will use one of them and fall back
|
||||
to another if the first is not available.
|
||||
|
||||
The pools can generate calls to the gRPC services, and will manage the lifecycle of the channels.
|
||||
|
||||
The API is designed to be simple to use, and will permit the caller to access the Stub interfaces
|
||||
for the services through a fluent API.
|
||||
|
||||
### Example Usage of the GrpcSingleNodeChannelPool
|
||||
|
||||
```java
|
||||
// create a pool for a non-partitioned service
|
||||
channelPool = factory.createSingle(
|
||||
ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.any()),
|
||||
MathApiGrpc::newBlockingStub);
|
||||
|
||||
// blocking call
|
||||
Response response = channelPool
|
||||
.call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup)
|
||||
.run(request);
|
||||
|
||||
// sequential blocking calls
|
||||
List<Response> response = channelPool
|
||||
.call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup)
|
||||
.runFor(request1, request2);
|
||||
|
||||
|
||||
// async call
|
||||
Future<Response> response = channelPool
|
||||
.call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup)
|
||||
.async(myExecutor)
|
||||
.run(request);
|
||||
|
||||
// multiple async calls
|
||||
Future<List<Response>> response = channelPool
|
||||
.call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup)
|
||||
.async(myExecutor)
|
||||
.runFor(request1, request2);
|
||||
```
|
||||
|
||||
### Example Usage of the GrpcSingleNodeChannelPool
|
||||
|
||||
```java
|
||||
// create a pool for a partitioned service
|
||||
channelPool = factory.createMulti(
|
||||
ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.multi()),
|
||||
MathApiGrpc::newBlockingStub);
|
||||
|
||||
// blocking call
|
||||
List<Response> response = channelPool
|
||||
.call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup)
|
||||
.run(request);
|
||||
|
||||
// async call
|
||||
Future<List<Response>> response = channelPool
|
||||
.call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup)
|
||||
.async(myExecutor)
|
||||
.runEach(request);
|
||||
|
||||
// async call, will fail or succeed as a group
|
||||
Future<List<Response>> response = channelPool
|
||||
.call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup)
|
||||
.async(myExecutor)
|
||||
.runAll(request1, request2);
|
||||
```
|
||||
|
||||
|
||||
### Central Classes
|
||||
|
||||
* [ServiceRegistryIf](java/nu/marginalia/service/discovery/ServiceRegistryIf.java)
|
||||
* [ZkServiceRegistry](java/nu/marginalia/service/discovery/ZkServiceRegistry.java)
|
@ -1,9 +1,9 @@
|
||||
package nu.marginalia.service.discovery;
|
||||
|
||||
import nu.marginalia.api.math.MathApiGrpc;
|
||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||
import nu.marginalia.service.discovery.property.ServicePartition;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.test.TestApiGrpc;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.retry.ExponentialBackoffRetry;
|
||||
import org.junit.jupiter.api.*;
|
||||
@ -94,7 +94,7 @@ class ZkServiceRegistryTest {
|
||||
var registry2 = createRegistry();
|
||||
|
||||
var key1 = ServiceKey.forRest(ServiceId.Search, 0);
|
||||
var key2 = ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.any());
|
||||
var key2 = ServiceKey.forGrpcApi(TestApiGrpc.class, ServicePartition.any());
|
||||
|
||||
var endpoint1 = registry1.registerService(key1, uuid1, "127.0.0.1");
|
||||
var endpoint2 = registry2.registerService(key2, uuid2, "127.0.0.2");
|
||||
@ -123,7 +123,7 @@ class ZkServiceRegistryTest {
|
||||
var registry1 = createRegistry();
|
||||
var registry2 = createRegistry();
|
||||
|
||||
var key = ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.any());
|
||||
var key = ServiceKey.forGrpcApi(TestApiGrpc.class, ServicePartition.any());
|
||||
|
||||
var endpoint1 = registry1.registerService(key, uuid1, "127.0.0.1");
|
||||
var endpoint2 = registry2.registerService(key, uuid2, "127.0.0.2");
|
||||
@ -149,8 +149,8 @@ class ZkServiceRegistryTest {
|
||||
var registry1 = createRegistry();
|
||||
var registry2 = createRegistry();
|
||||
|
||||
var key1 = ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.partition(1));
|
||||
var key2 = ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.partition(2));
|
||||
var key1 = ServiceKey.forGrpcApi(TestApiGrpc.class, ServicePartition.partition(1));
|
||||
var key2 = ServiceKey.forGrpcApi(TestApiGrpc.class, ServicePartition.partition(2));
|
||||
|
||||
var endpoint1 = registry1.registerService(key1, uuid1, "127.0.0.1");
|
||||
var endpoint2 = registry2.registerService(key2, uuid2, "127.0.0.2");
|
@ -0,0 +1,160 @@
|
||||
package nu.marginalia.service.server;
|
||||
|
||||
import io.grpc.BindableService;
|
||||
import io.grpc.stub.StreamObserver;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.client.GrpcChannelPoolFactory;
|
||||
import nu.marginalia.service.client.GrpcSingleNodeChannelPool;
|
||||
import nu.marginalia.service.discovery.ServiceRegistryIf;
|
||||
import nu.marginalia.service.discovery.property.ServiceEndpoint;
|
||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||
import nu.marginalia.service.discovery.property.ServicePartition;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import nu.marginalia.test.RpcInteger;
|
||||
import nu.marginalia.test.TestApiGrpc;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
|
||||
public class GrpcServerTest {
|
||||
|
||||
List<GrpcServer> servers = new ArrayList<>();
|
||||
List<GrpcSingleNodeChannelPool<?>> clients = new ArrayList<>();
|
||||
|
||||
@AfterEach
|
||||
public void close() {
|
||||
for (var server : servers) {
|
||||
try {
|
||||
server.stop();
|
||||
} catch (InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
for (var client : clients) {
|
||||
client.stop();
|
||||
}
|
||||
servers.clear();
|
||||
clients.clear();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStartStopServer() throws Exception {
|
||||
var server = createServerOnPort(randomHighPort(), UUID.randomUUID(), new TestGrpcService());
|
||||
|
||||
server.start();
|
||||
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClient() throws Exception {
|
||||
int port = randomHighPort();
|
||||
UUID serverUUID = UUID.randomUUID();
|
||||
|
||||
var server = createServerOnPort(port, serverUUID, new TestGrpcService());
|
||||
server.start();
|
||||
|
||||
var mockRegistry = Mockito.mock(ServiceRegistryIf.class);
|
||||
when(mockRegistry.getEndpoints(any())).thenReturn(
|
||||
Set.of(new ServiceEndpoint("127.0.0.1", port).asInstance(serverUUID)));
|
||||
|
||||
var client = createClient(mockRegistry);
|
||||
client.onChange();
|
||||
|
||||
var ret = client.call(TestApiGrpc.TestApiBlockingStub::increment)
|
||||
.run(RpcInteger.newBuilder().setValue(1).build());
|
||||
Assertions.assertEquals(2, ret.getValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClientConnSwitch() throws Exception {
|
||||
int port = randomHighPort();
|
||||
UUID serverUUID1 = UUID.randomUUID();
|
||||
UUID serverUUID2 = UUID.randomUUID();
|
||||
|
||||
var server1 = createServerOnPort(port, serverUUID1, new TestGrpcService());
|
||||
|
||||
server1.start();
|
||||
|
||||
Set<ServiceEndpoint.InstanceAddress> endpoints = new HashSet<>();
|
||||
endpoints.add(new ServiceEndpoint("127.0.0.1", port).asInstance(serverUUID1));
|
||||
|
||||
var mockRegistry = Mockito.mock(ServiceRegistryIf.class);
|
||||
when(mockRegistry.getEndpoints(any())).thenReturn(endpoints);
|
||||
|
||||
var client = createClient(mockRegistry);
|
||||
|
||||
var ret = client.call(TestApiGrpc.TestApiBlockingStub::increment)
|
||||
.run(RpcInteger.newBuilder().setValue(1).build());
|
||||
System.out.println(ret);
|
||||
|
||||
server1.stop();
|
||||
var server2 = createServerOnPort(port + 1, serverUUID2, new TestGrpcService());
|
||||
endpoints.add(new ServiceEndpoint("127.0.0.1", port + 1).asInstance(serverUUID2));
|
||||
server2.start();
|
||||
client.onChange();
|
||||
|
||||
ret = client.call(TestApiGrpc.TestApiBlockingStub::increment)
|
||||
.run(RpcInteger.newBuilder().setValue(1).build());
|
||||
System.out.println(ret);
|
||||
|
||||
endpoints.clear();
|
||||
endpoints.add(new ServiceEndpoint("127.0.0.1", port + 1).asInstance(serverUUID2));
|
||||
client.onChange();
|
||||
|
||||
ret = client.call(TestApiGrpc.TestApiBlockingStub::increment)
|
||||
.run(RpcInteger.newBuilder().setValue(1).build());
|
||||
System.out.println(ret);
|
||||
|
||||
}
|
||||
|
||||
private GrpcServer createServerOnPort(int port, UUID uuid, BindableService... services) throws Exception {
|
||||
var mockRegistry = Mockito.mock(ServiceRegistryIf.class);
|
||||
when(mockRegistry.requestPort(any(), any())).thenReturn(port);
|
||||
|
||||
var config = new ServiceConfiguration(ServiceId.Api, 1,
|
||||
"127.0.0.1", "127.0.0.1", -1, uuid);
|
||||
|
||||
var server = new GrpcServer(config, mockRegistry, ServicePartition.any(), List.of(services));
|
||||
servers.add(server);
|
||||
return server;
|
||||
}
|
||||
|
||||
private GrpcSingleNodeChannelPool<TestApiGrpc.TestApiBlockingStub> createClient(ServiceRegistryIf mockRegistry) {
|
||||
var client = new GrpcChannelPoolFactory(null, mockRegistry).createSingle(
|
||||
ServiceKey.forGrpcApi(TestApiGrpc.class, ServicePartition.any()),
|
||||
TestApiGrpc::newBlockingStub);
|
||||
clients.add(client);
|
||||
return client;
|
||||
}
|
||||
|
||||
private int randomHighPort() {
|
||||
return 12000 + (int) (Math.random() * 1000);
|
||||
}
|
||||
|
||||
private static class TestGrpcService extends TestApiGrpc.TestApiImplBase {
|
||||
|
||||
@Override
|
||||
public void increment(RpcInteger request, StreamObserver<RpcInteger> obs) {
|
||||
obs.onNext(RpcInteger.newBuilder().setValue(request.getValue() + 1).build());
|
||||
obs.onCompleted();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void count(RpcInteger request, StreamObserver<RpcInteger> obs) {
|
||||
for (int i = 0; i < request.getValue(); i++) {
|
||||
obs.onNext(RpcInteger.newBuilder().setValue(i).build());
|
||||
}
|
||||
obs.onCompleted();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -30,18 +30,17 @@ dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:index:api')
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:libraries:message-queue')
|
||||
implementation project(':code:common:service-discovery')
|
||||
|
||||
implementation libs.bundles.slf4j
|
||||
|
||||
implementation libs.prometheus
|
||||
implementation libs.notnull
|
||||
implementation libs.guice
|
||||
implementation libs.protobuf
|
||||
implementation libs.bundles.protobuf
|
||||
implementation libs.bundles.grpc
|
||||
implementation libs.javax.annotation
|
||||
implementation libs.gson
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
|
@ -14,7 +14,7 @@ import nu.marginalia.service.client.GrpcMultiNodeChannelPool;
|
||||
import nu.marginalia.service.discovery.ServiceRegistryIf;
|
||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||
import nu.marginalia.service.discovery.property.ServicePartition;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.storage.model.FileStorageId;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
|
@ -28,7 +28,6 @@ dependencies {
|
||||
implementation project(':code:common:linkdb')
|
||||
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:common:service-discovery')
|
||||
|
||||
implementation project(':third-party:commons-codec')
|
||||
|
||||
@ -57,7 +56,6 @@ dependencies {
|
||||
implementation libs.notnull
|
||||
implementation libs.guice
|
||||
implementation libs.trove
|
||||
implementation libs.protobuf
|
||||
implementation libs.zstd
|
||||
implementation libs.jsoup
|
||||
implementation libs.commons.io
|
||||
|
@ -19,7 +19,7 @@ apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:common:service')
|
||||
|
||||
implementation libs.bundles.slf4j
|
||||
|
||||
@ -27,8 +27,7 @@ dependencies {
|
||||
implementation libs.notnull
|
||||
implementation libs.guice
|
||||
implementation libs.gson
|
||||
implementation libs.protobuf
|
||||
implementation libs.javax.annotation
|
||||
implementation libs.bundles.protobuf
|
||||
implementation libs.bundles.grpc
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
|
@ -21,7 +21,6 @@ dependencies {
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:common:service-discovery')
|
||||
|
||||
implementation project(':code:libraries:geo-ip')
|
||||
|
||||
|
@ -19,7 +19,6 @@ dependencies {
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:service-discovery')
|
||||
|
||||
implementation libs.bundles.slf4j
|
||||
|
||||
|
@ -19,7 +19,7 @@ apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:common:service')
|
||||
|
||||
implementation libs.bundles.slf4j
|
||||
|
||||
@ -27,9 +27,8 @@ dependencies {
|
||||
implementation libs.notnull
|
||||
implementation libs.guice
|
||||
implementation libs.gson
|
||||
implementation libs.protobuf
|
||||
implementation libs.bundles.protobuf
|
||||
implementation libs.roaringbitmap
|
||||
implementation libs.javax.annotation
|
||||
implementation libs.bundles.grpc
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
|
@ -21,7 +21,6 @@ dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:linkdb')
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:common:service-discovery')
|
||||
|
||||
implementation libs.bundles.slf4j
|
||||
|
||||
|
@ -19,7 +19,7 @@ apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:common:service')
|
||||
|
||||
implementation libs.bundles.slf4j
|
||||
|
||||
@ -27,8 +27,7 @@ dependencies {
|
||||
implementation libs.notnull
|
||||
implementation libs.guice
|
||||
implementation libs.gson
|
||||
implementation libs.protobuf
|
||||
implementation libs.javax.annotation
|
||||
implementation libs.bundles.protobuf
|
||||
implementation libs.bundles.grpc
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
|
@ -21,8 +21,8 @@ apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:index:query')
|
||||
implementation project(':code:common:service-discovery')
|
||||
|
||||
implementation libs.bundles.slf4j
|
||||
|
||||
@ -30,8 +30,7 @@ dependencies {
|
||||
implementation libs.notnull
|
||||
implementation libs.guice
|
||||
implementation libs.gson
|
||||
implementation libs.protobuf
|
||||
implementation libs.javax.annotation
|
||||
implementation libs.bundles.protobuf
|
||||
implementation libs.bundles.grpc
|
||||
implementation libs.fastutil
|
||||
|
||||
|
@ -17,7 +17,6 @@ dependencies {
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:service-discovery')
|
||||
|
||||
implementation project(':code:functions:search-query:api')
|
||||
|
||||
|
@ -14,7 +14,7 @@ apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:libraries:message-queue')
|
||||
implementation project(':code:functions:search-query:api')
|
||||
|
||||
@ -23,7 +23,7 @@ dependencies {
|
||||
implementation libs.prometheus
|
||||
implementation libs.notnull
|
||||
implementation libs.guice
|
||||
implementation libs.protobuf
|
||||
implementation libs.bundles.protobuf
|
||||
implementation libs.fastutil
|
||||
implementation libs.javax.annotation
|
||||
implementation libs.bundles.gson
|
||||
|
@ -5,7 +5,7 @@ import com.google.inject.Singleton;
|
||||
import com.google.inject.name.Named;
|
||||
import nu.marginalia.mq.MessageQueueFactory;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
|
@ -27,7 +27,6 @@ dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:linkdb')
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:common:service-discovery')
|
||||
|
||||
implementation project(':code:functions:search-query:api')
|
||||
|
||||
|
@ -15,7 +15,7 @@ import nu.marginalia.process.control.FakeProcessHeartbeat;
|
||||
import nu.marginalia.process.control.ProcessHeartbeat;
|
||||
import nu.marginalia.index.domainrankings.DomainRankings;
|
||||
import nu.marginalia.service.control.*;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
|
@ -1,5 +1,7 @@
|
||||
plugins {
|
||||
id 'java'
|
||||
|
||||
id "com.google.protobuf" version "0.9.4"
|
||||
}
|
||||
|
||||
java {
|
||||
@ -8,9 +10,12 @@ java {
|
||||
}
|
||||
}
|
||||
|
||||
apply from: "$rootProject.projectDir/protobuf.gradle"
|
||||
apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
|
||||
dependencies {
|
||||
implementation libs.bundles.protobuf
|
||||
implementation libs.bundles.grpc
|
||||
implementation libs.bundles.slf4j
|
||||
implementation libs.bundles.mariadb
|
||||
implementation libs.bundles.slf4j.test
|
||||
|
15
code/libraries/test-helpers/src/main/protobuf/testapi.proto
Normal file
15
code/libraries/test-helpers/src/main/protobuf/testapi.proto
Normal file
@ -0,0 +1,15 @@
|
||||
syntax="proto3";
|
||||
package nu.marginalia.test;
|
||||
|
||||
option java_package="nu.marginalia.test";
|
||||
option java_multiple_files=true;
|
||||
|
||||
/* Dummy API for testing gRPC, service discovery, similar things */
|
||||
service TestApi {
|
||||
rpc increment(RpcInteger) returns (RpcInteger) {}
|
||||
rpc count(RpcInteger) returns (stream RpcInteger) {}
|
||||
}
|
||||
|
||||
message RpcInteger {
|
||||
int32 value = 1;
|
||||
}
|
@ -21,7 +21,6 @@ dependencies {
|
||||
implementation project(':code:common:process')
|
||||
implementation project(':code:libraries:big-string')
|
||||
implementation project(':code:index:api')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:features-crawl:content-type')
|
||||
implementation project(':code:libraries:language-processing')
|
||||
implementation project(':third-party:parquet-floor')
|
||||
|
@ -35,7 +35,6 @@ dependencies {
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:libraries:message-queue')
|
||||
implementation project(':code:libraries:blocking-thread-pool')
|
||||
implementation project(':code:common:service-discovery')
|
||||
|
||||
implementation project(':code:libraries:guarded-regex')
|
||||
implementation project(':code:libraries:easy-lsh')
|
||||
|
@ -31,7 +31,6 @@ dependencies {
|
||||
implementation project(':code:libraries:blocking-thread-pool')
|
||||
implementation project(':code:index:api')
|
||||
implementation project(':code:process-mqapi')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:libraries:message-queue')
|
||||
implementation project(':code:libraries:language-processing')
|
||||
implementation project(':code:libraries:easy-lsh')
|
||||
|
@ -27,7 +27,6 @@ dependencies {
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:common:linkdb')
|
||||
implementation project(':code:index:index-journal')
|
||||
implementation project(':code:libraries:message-queue')
|
||||
|
@ -23,7 +23,6 @@ dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:common:process')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:functions:link-graph:api')
|
||||
|
||||
|
@ -10,7 +10,7 @@ import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.process.control.ProcessHeartbeat;
|
||||
import nu.marginalia.process.control.ProcessHeartbeatImpl;
|
||||
import nu.marginalia.service.ProcessMainClass;
|
||||
import nu.marginalia.service.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -27,7 +27,6 @@ dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:functions:search-query:api')
|
||||
implementation project(':code:index:query')
|
||||
|
||||
|
@ -4,8 +4,8 @@ import com.google.inject.Guice;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import nu.marginalia.service.MainClass;
|
||||
import nu.marginalia.service.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceConfigurationModule;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.service.server.Initialization;
|
||||
|
@ -27,7 +27,6 @@ dependencies {
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:common:renderer')
|
||||
implementation project(':code:features-search:screenshots')
|
||||
implementation project(':code:features-search:random-websites')
|
||||
|
@ -4,8 +4,8 @@ import com.google.inject.Guice;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import nu.marginalia.service.MainClass;
|
||||
import nu.marginalia.service.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceConfigurationModule;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.service.server.Initialization;
|
||||
|
@ -26,7 +26,6 @@ apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
dependencies {
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:common:renderer')
|
||||
|
||||
implementation project(':code:features-search:random-websites')
|
||||
|
@ -4,8 +4,8 @@ import com.google.inject.Guice;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import nu.marginalia.service.MainClass;
|
||||
import nu.marginalia.service.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceConfigurationModule;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.service.server.Initialization;
|
||||
|
@ -48,7 +48,6 @@ dependencies {
|
||||
|
||||
|
||||
implementation project(':code:index:api')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:common:renderer')
|
||||
|
||||
implementation project(':code:features-search:screenshots')
|
||||
|
@ -4,8 +4,8 @@ import com.google.inject.Guice;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import nu.marginalia.service.MainClass;
|
||||
import nu.marginalia.service.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceConfigurationModule;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.service.server.Initialization;
|
||||
|
@ -34,7 +34,6 @@ dependencies {
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:common:service-discovery')
|
||||
|
||||
implementation project(':code:features-search:screenshots')
|
||||
|
||||
|
@ -4,8 +4,8 @@ import com.google.inject.Guice;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import nu.marginalia.service.MainClass;
|
||||
import nu.marginalia.service.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceConfigurationModule;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.service.server.Initialization;
|
||||
|
@ -32,7 +32,6 @@ dependencies {
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:renderer')
|
||||
implementation project(':code:libraries:message-queue')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:functions:search-query:api')
|
||||
implementation project(':code:execution:api')
|
||||
implementation project(':code:index:api')
|
||||
|
@ -4,8 +4,8 @@ import com.google.inject.Guice;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import nu.marginalia.service.MainClass;
|
||||
import nu.marginalia.service.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceConfigurationModule;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.service.server.Initialization;
|
||||
|
@ -1,165 +0,0 @@
|
||||
package nu.marginalia.control.actor.rebalance;
|
||||
|
||||
public class RebalanceActor {
|
||||
/**
|
||||
// States
|
||||
|
||||
public static final String INIT = "INIT";
|
||||
public static final String END = "END";
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(RebalanceActor.class);
|
||||
|
||||
private final NodeConfigurationService nodeConfigurationService;
|
||||
private final MqPersistence mqPersistence;
|
||||
private final HikariDataSource dataSource;
|
||||
private final Gson gson = GsonFactory.get();
|
||||
@Override
|
||||
public String describe() {
|
||||
return "Rebalances crawl data among the nodes";
|
||||
}
|
||||
|
||||
@Inject
|
||||
public RebalanceActor(ActorStateFactory stateFactory,
|
||||
NodeConfigurationService nodeConfigurationService,
|
||||
MqPersistence mqPersistence, HikariDataSource dataSource)
|
||||
{
|
||||
super(stateFactory);
|
||||
this.nodeConfigurationService = nodeConfigurationService;
|
||||
this.mqPersistence = mqPersistence;
|
||||
this.dataSource = dataSource;
|
||||
}
|
||||
|
||||
@ActorState(name= INIT, next = END, resume = ActorResumeBehavior.ERROR,
|
||||
description = "Rebalance!")
|
||||
public void doIt() throws Exception {
|
||||
var populations = getNodePopulations();
|
||||
|
||||
if (populations.size() <= 1) {
|
||||
transition(END);
|
||||
}
|
||||
|
||||
int average = (int) populations.stream().mapToInt(pop -> pop.count).average().orElse(0);
|
||||
int tolerance = average / 10;
|
||||
|
||||
PriorityQueue<Sur> surplusList = new PriorityQueue<>();
|
||||
PriorityQueue<Def> deficitList = new PriorityQueue<>();
|
||||
|
||||
populations.forEach(pop -> {
|
||||
int delta = pop.count - average;
|
||||
if (delta - tolerance > 0) {
|
||||
surplusList.add(new Sur(pop.node, delta));
|
||||
}
|
||||
else if (delta + tolerance < 0) {
|
||||
deficitList.add(new Def(pop.node, -delta));
|
||||
}
|
||||
});
|
||||
|
||||
List<Give> actions = new ArrayList<>();
|
||||
|
||||
while (!surplusList.isEmpty() && !deficitList.isEmpty()) {
|
||||
var sur = surplusList.poll();
|
||||
var def = deficitList.poll();
|
||||
|
||||
assert (sur.n != def.n);
|
||||
|
||||
int amount = Math.min(sur.c, def.c);
|
||||
actions.add(new Give(sur.n, def.n, amount));
|
||||
|
||||
if (sur.c - amount > tolerance) {
|
||||
surplusList.add(new Sur(sur.n, sur.c - amount));
|
||||
}
|
||||
if (def.c - amount > tolerance) {
|
||||
deficitList.add(new Def(def.n, def.c - amount));
|
||||
}
|
||||
}
|
||||
|
||||
for (var action : actions) {
|
||||
var outbox = new MqOutbox(mqPersistence, "executor-service", action.dest, getClass().getSimpleName(), 0, UUID.randomUUID());
|
||||
var msg = outbox.send("TRANSFER-DOMAINS",
|
||||
gson.toJson(Map.of("sourceNode", action.donor, "count", action.c)));
|
||||
if (msg.state() != MqMessageState.OK) {
|
||||
logger.error("ERROR! {}", msg);
|
||||
}
|
||||
outbox.stop();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private List<Pop> getNodePopulations() throws SQLException {
|
||||
Map<Integer, Integer> ret = new HashMap<>();
|
||||
|
||||
try (var conn = dataSource.getConnection();
|
||||
var query = conn.prepareStatement("""
|
||||
SELECT NODE_AFFINITY, COUNT(*)
|
||||
FROM EC_DOMAIN
|
||||
WHERE NODE_AFFINITY > 0
|
||||
GROUP BY NODE_AFFINITY
|
||||
""")) {
|
||||
var rs = query.executeQuery();
|
||||
while (rs.next()) {
|
||||
ret.put(rs.getInt(1), rs.getInt(2));
|
||||
}
|
||||
}
|
||||
|
||||
for (var node : nodeConfigurationService.getAll()) {
|
||||
if (isNodeExcluded(node)) {
|
||||
ret.remove(node.node());
|
||||
} else {
|
||||
ret.putIfAbsent(node.node(), 0);
|
||||
}
|
||||
}
|
||||
|
||||
return ret.entrySet().stream().map(e -> new Pop(e.getKey(), e.getValue())).toList();
|
||||
}
|
||||
|
||||
private boolean isNodeExcluded(NodeConfiguration node) {
|
||||
return node.disabled();
|
||||
}
|
||||
|
||||
//* 1. calculate sizes for each node using db
|
||||
//
|
||||
//2. rebalance
|
||||
//
|
||||
//-- find average
|
||||
//-- calculate surplus and deficit, with a NN% tolerance
|
||||
//-- create instructions for redistribution
|
||||
//
|
||||
//3. instruct each executor to transfer data:
|
||||
//
|
||||
//-- transfer domain data
|
||||
//-- append to receiver crawler log
|
||||
//-- instruct donor to delete file
|
||||
//
|
||||
//4. regenerate crawler logs based on present files on all donor nodes * /
|
||||
|
||||
public record Sur(int n, int c) implements Comparable<Sur> {
|
||||
@Override
|
||||
public int compareTo(@NotNull RebalanceActor.Sur o) {
|
||||
int d = Integer.compare(o.c, c);
|
||||
if (d != 0)
|
||||
return d;
|
||||
|
||||
return Integer.compare(n, o.n);
|
||||
}
|
||||
}
|
||||
public record Def(int n, int c) implements Comparable<Def> {
|
||||
|
||||
@Override
|
||||
public int compareTo(@NotNull RebalanceActor.Def o) {
|
||||
int d = Integer.compare(o.c, c);
|
||||
if (d != 0)
|
||||
return d;
|
||||
|
||||
return Integer.compare(n, o.n);
|
||||
}
|
||||
}
|
||||
|
||||
public record Populations(List<Pop> pops) {
|
||||
}
|
||||
public record Pop(int node, int count) {
|
||||
|
||||
}
|
||||
public record Give(int donor, int dest, int c) {
|
||||
|
||||
} */
|
||||
}
|
@ -15,7 +15,7 @@ import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import nu.marginalia.nodecfg.model.NodeConfiguration;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.executor.client.ExecutorClient;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.storage.model.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -12,7 +12,7 @@ import nu.marginalia.mq.MessageQueueFactory;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import nu.marginalia.service.control.ServiceEventLog;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.storage.FileStorageService;
|
||||
import nu.marginalia.storage.model.FileStorageType;
|
||||
import spark.Request;
|
||||
|
@ -40,7 +40,6 @@ dependencies {
|
||||
implementation project(':code:common:linkdb')
|
||||
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:common:service-discovery')
|
||||
|
||||
implementation project(':third-party:commons-codec')
|
||||
|
||||
@ -72,7 +71,6 @@ dependencies {
|
||||
implementation libs.notnull
|
||||
implementation libs.guice
|
||||
implementation libs.trove
|
||||
implementation libs.protobuf
|
||||
implementation libs.zstd
|
||||
implementation libs.jsoup
|
||||
implementation libs.commons.io
|
||||
|
@ -4,8 +4,8 @@ import com.google.inject.Guice;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import nu.marginalia.service.MainClass;
|
||||
import nu.marginalia.service.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.service.module.ServiceConfigurationModule;
|
||||
import nu.marginalia.service.server.Initialization;
|
||||
|
@ -26,7 +26,6 @@ apply from: "$rootProject.projectDir/srcsets.gradle"
|
||||
dependencies {
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:common:model')
|
||||
implementation project(':code:common:db')
|
||||
implementation project(':code:common:linkdb')
|
||||
@ -37,8 +36,6 @@ dependencies {
|
||||
implementation project(':code:functions:search-query:api')
|
||||
implementation project(':code:index:api')
|
||||
|
||||
implementation project(':code:common:service-discovery')
|
||||
|
||||
testImplementation project(path: ':code:services-core:control-service')
|
||||
testImplementation project(':code:common:process')
|
||||
|
||||
@ -47,7 +44,6 @@ dependencies {
|
||||
implementation libs.prometheus
|
||||
implementation libs.notnull
|
||||
implementation libs.guice
|
||||
implementation libs.protobuf
|
||||
implementation libs.bundles.httpcomponents
|
||||
implementation libs.roaringbitmap
|
||||
implementation libs.snakeyaml
|
||||
|
@ -4,8 +4,8 @@ import com.google.inject.Guice;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import nu.marginalia.service.MainClass;
|
||||
import nu.marginalia.service.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceConfigurationModule;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.service.server.Initialization;
|
||||
|
@ -31,7 +31,6 @@ dependencies {
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:common:renderer')
|
||||
implementation project(':code:index:api')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:index:query')
|
||||
|
||||
implementation project(':code:functions:search-query')
|
||||
@ -46,7 +45,6 @@ dependencies {
|
||||
implementation libs.prometheus
|
||||
implementation libs.notnull
|
||||
implementation libs.guice
|
||||
implementation libs.protobuf
|
||||
implementation libs.bundles.mariadb
|
||||
implementation libs.bundles.grpc
|
||||
|
||||
|
@ -4,8 +4,8 @@ import com.google.inject.Guice;
|
||||
import com.google.inject.Inject;
|
||||
import com.google.inject.Injector;
|
||||
import nu.marginalia.service.MainClass;
|
||||
import nu.marginalia.service.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceDiscoveryModule;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
import nu.marginalia.service.module.ServiceConfigurationModule;
|
||||
import nu.marginalia.service.module.DatabaseModule;
|
||||
import nu.marginalia.service.server.Initialization;
|
||||
|
@ -30,7 +30,6 @@ dependencies {
|
||||
implementation project(':code:common:config')
|
||||
implementation project(':code:common:process')
|
||||
implementation project(':code:common:service')
|
||||
implementation project(':code:common:service-discovery')
|
||||
implementation project(':code:libraries:language-processing')
|
||||
implementation project(':code:libraries:term-frequency-dict')
|
||||
implementation project(':code:libraries:big-string')
|
||||
|
@ -70,7 +70,6 @@ include 'code:features-crawl:content-type'
|
||||
|
||||
include 'code:process-mqapi'
|
||||
|
||||
include 'code:common:service-discovery'
|
||||
include 'code:common:db'
|
||||
include 'code:common:linkdb'
|
||||
include 'code:common:service'
|
||||
@ -224,6 +223,7 @@ dependencyResolutionManagement {
|
||||
bundle('selenium', ['selenium.chrome', 'selenium.java'])
|
||||
bundle('handlebars', ['handlebars', 'handlebars.markdown'])
|
||||
bundle('grpc', ['protobuf', 'grpc-stub', 'grpc-protobuf', 'grpc-netty'])
|
||||
bundle('protobuf', ['protobuf', 'javax.annotation'])
|
||||
bundle('gson', ['gson', 'gson-type-adapter'])
|
||||
bundle('httpcomponents', ['httpcomponents.core', 'httpcomponents.client'])
|
||||
bundle('parquet', ['parquet-column', 'parquet-hadoop'])
|
||||
|
Loading…
Reference in New Issue
Block a user