From 46423612e302a1e5541763f7195165716b70549e Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Sun, 3 Mar 2024 10:49:23 +0100 Subject: [PATCH] (refac) Merge service-discovery and service modules Also adds a few tests to the server/client code. --- code/common/config/build.gradle | 1 - .../config/java/nu/marginalia/WmsaHome.java | 6 +- code/common/model/build.gradle | 1 - code/common/readme.md | 4 +- code/common/service-discovery/build.gradle | 41 ----- code/common/service-discovery/readme.md | 149 ---------------- code/common/service/build.gradle | 8 +- .../java/nu/marginalia/service/MainClass.java | 1 - .../service/NodeConfigurationWatcher.java | 0 .../nu/marginalia/service}/ServiceId.java | 2 +- .../marginalia/service/ServiceMonitors.java | 1 - .../client/GrpcChannelPoolFactory.java | 3 +- .../client/GrpcMultiNodeChannelPool.java | 0 .../client/GrpcSingleNodeChannelPool.java | 14 ++ .../client/ServiceNotAvailableException.java | 0 .../service/discovery/ServiceRegistryIf.java | 0 .../service/discovery/ZkServiceRegistry.java | 0 .../monitor/ServiceChangeMonitor.java | 0 .../discovery/monitor/ServiceMonitorIf.java | 0 .../discovery/property/PartitionTraits.java | 0 .../discovery/property/ServiceEndpoint.java | 0 .../discovery/property/ServiceKey.java | 2 +- .../discovery/property/ServicePartition.java | 0 .../service/module/ServiceConfiguration.java | 2 +- .../module/ServiceConfigurationModule.java | 2 +- .../module}/ServiceDiscoveryModule.java | 4 +- .../marginalia/service/server/GrpcServer.java | 59 +++++++ .../nu/marginalia/service/server/Service.java | 36 +--- .../util}/NamedExecutorFactory.java | 2 +- .../ServiceHomeNotConfiguredException.java | 2 +- code/common/service/readme.md | 154 +++++++++++++++- .../discovery/ZkServiceRegistryTest.java | 12 +- .../service/server/GrpcServerTest.java | 160 +++++++++++++++++ code/execution/api/build.gradle | 5 +- .../executor/client/ExecutorClient.java | 2 +- code/execution/build.gradle | 2 - code/functions/domain-info/api/build.gradle | 5 +- code/functions/domain-info/build.gradle | 1 - .../link-graph/aggregate/build.gradle | 1 - code/functions/link-graph/api/build.gradle | 5 +- .../link-graph/partition/build.gradle | 1 - code/functions/math/api/build.gradle | 5 +- code/functions/search-query/api/build.gradle | 5 +- code/functions/search-query/build.gradle | 1 - code/index/api/build.gradle | 4 +- .../marginalia/index/api/IndexMqClient.java | 2 +- code/index/build.gradle | 1 - ...ndexQueryServiceIntegrationTestModule.java | 2 +- code/libraries/test-helpers/build.gradle | 5 + .../src/main/protobuf/testapi.proto | 15 ++ .../crawling-model/build.gradle | 1 - .../processes/converting-process/build.gradle | 1 - code/processes/crawling-process/build.gradle | 1 - code/processes/loading-process/build.gradle | 1 - .../build.gradle | 1 - .../WebsiteAdjacenciesCalculator.java | 2 +- .../api-service/build.gradle | 1 - .../java/nu/marginalia/api/ApiMain.java | 4 +- .../dating-service/build.gradle | 1 - .../java/nu/marginalia/dating/DatingMain.java | 4 +- .../explorer-service/build.gradle | 1 - .../nu/marginalia/explorer/ExplorerMain.java | 4 +- .../search-service/build.gradle | 1 - .../java/nu/marginalia/search/SearchMain.java | 4 +- .../assistant-service/build.gradle | 1 - .../marginalia/assistant/AssistantMain.java | 4 +- .../control-service/build.gradle | 1 - .../nu/marginalia/control/ControlMain.java | 4 +- .../actor/rebalance/RebalanceActor.java | 165 ------------------ .../control/node/svc/ControlNodeService.java | 2 +- .../sys/svc/ControlSysActionsService.java | 2 +- .../executor-service/build.gradle | 2 - .../nu/marginalia/executor/ExecutorMain.java | 4 +- code/services-core/index-service/build.gradle | 4 - .../java/nu/marginalia/index/IndexMain.java | 4 +- code/services-core/query-service/build.gradle | 2 - .../java/nu/marginalia/query/QueryMain.java | 4 +- code/tools/experiment-runner/build.gradle | 1 - settings.gradle | 2 +- 79 files changed, 474 insertions(+), 483 deletions(-) delete mode 100644 code/common/service-discovery/build.gradle delete mode 100644 code/common/service-discovery/readme.md rename code/common/{service-discovery => service}/java/nu/marginalia/service/NodeConfigurationWatcher.java (100%) rename code/common/{service-discovery/java/nu/marginalia/service/id => service/java/nu/marginalia/service}/ServiceId.java (95%) rename code/common/{service-discovery => service}/java/nu/marginalia/service/ServiceMonitors.java (99%) rename code/common/{service-discovery => service}/java/nu/marginalia/service/client/GrpcChannelPoolFactory.java (95%) rename code/common/{service-discovery => service}/java/nu/marginalia/service/client/GrpcMultiNodeChannelPool.java (100%) rename code/common/{service-discovery => service}/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java (96%) rename code/common/{service-discovery => service}/java/nu/marginalia/service/client/ServiceNotAvailableException.java (100%) rename code/common/{service-discovery => service}/java/nu/marginalia/service/discovery/ServiceRegistryIf.java (100%) rename code/common/{service-discovery => service}/java/nu/marginalia/service/discovery/ZkServiceRegistry.java (100%) rename code/common/{service-discovery => service}/java/nu/marginalia/service/discovery/monitor/ServiceChangeMonitor.java (100%) rename code/common/{service-discovery => service}/java/nu/marginalia/service/discovery/monitor/ServiceMonitorIf.java (100%) rename code/common/{service-discovery => service}/java/nu/marginalia/service/discovery/property/PartitionTraits.java (100%) rename code/common/{service-discovery => service}/java/nu/marginalia/service/discovery/property/ServiceEndpoint.java (100%) rename code/common/{service-discovery => service}/java/nu/marginalia/service/discovery/property/ServiceKey.java (98%) rename code/common/{service-discovery => service}/java/nu/marginalia/service/discovery/property/ServicePartition.java (100%) rename code/common/{service-discovery/java/nu/marginalia/service => service/java/nu/marginalia/service/module}/ServiceDiscoveryModule.java (96%) create mode 100644 code/common/service/java/nu/marginalia/service/server/GrpcServer.java rename code/common/{service-discovery/java/nu/marginalia/service => service/java/nu/marginalia/util}/NamedExecutorFactory.java (97%) rename code/common/{service-discovery/java/nu/marginalia/service => service/java/nu/marginalia/util}/ServiceHomeNotConfiguredException.java (84%) rename code/common/{service-discovery => service}/test/nu/marginalia/service/discovery/ZkServiceRegistryTest.java (94%) create mode 100644 code/common/service/test/nu/marginalia/service/server/GrpcServerTest.java create mode 100644 code/libraries/test-helpers/src/main/protobuf/testapi.proto delete mode 100644 code/services-core/control-service/java/nu/marginalia/control/actor/rebalance/RebalanceActor.java diff --git a/code/common/config/build.gradle b/code/common/config/build.gradle index 66a984d4..0ceb00ae 100644 --- a/code/common/config/build.gradle +++ b/code/common/config/build.gradle @@ -14,7 +14,6 @@ java { apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { - implementation project(':code:common:service-discovery') implementation project(':code:common:db') implementation project(':code:common:model') diff --git a/code/common/config/java/nu/marginalia/WmsaHome.java b/code/common/config/java/nu/marginalia/WmsaHome.java index 122f6211..b61ee4dd 100644 --- a/code/common/config/java/nu/marginalia/WmsaHome.java +++ b/code/common/config/java/nu/marginalia/WmsaHome.java @@ -1,8 +1,6 @@ package nu.marginalia; -import nu.marginalia.service.ServiceHomeNotConfiguredException; - import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -50,7 +48,7 @@ public class WmsaHome { .map(p -> p.resolve("run")) .findAny(); - return testRoot.orElseThrow(() -> new ServiceHomeNotConfiguredException(""" + return testRoot.orElseThrow(() -> new IllegalStateException(""" Could not find $WMSA_HOME, either set environment variable, the 'system.homePath' property, or ensure either /wmssa or /var/lib/wmsa exists @@ -60,7 +58,7 @@ public class WmsaHome { var ret = Path.of(retStr.get()); if (!Files.isDirectory(ret.resolve("model"))) { - throw new ServiceHomeNotConfiguredException("You need to run 'run/setup.sh' to download models to run/ before this will work!"); + throw new IllegalStateException("You need to run 'run/setup.sh' to download models to run/ before this will work!"); } return ret; diff --git a/code/common/model/build.gradle b/code/common/model/build.gradle index 80d9f247..6d27b375 100644 --- a/code/common/model/build.gradle +++ b/code/common/model/build.gradle @@ -13,7 +13,6 @@ java { apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { - implementation project(':code:common:service-discovery') implementation project(':code:libraries:big-string') implementation project(':code:libraries:braille-block-punch-cards') diff --git a/code/common/readme.md b/code/common/readme.md index b6329457..47cf00a3 100644 --- a/code/common/readme.md +++ b/code/common/readme.md @@ -6,6 +6,6 @@ as shared models. * [db](db/) contains SQL code and some database-related utilities. * [config](config/) contains some `@Inject`ables. * [renderer](renderer/) contains utility code for rendering website templates. -* [service](service/) is the shared base classes for main methods and web services. -* [service-discovery](service-discovery) contains tools that lets the services find each other and communicate. +* [service](service/) is the shared base classes for main methods and web services, + including a service registry interface. * [process](process/) contains boiler plate for batch processes. diff --git a/code/common/service-discovery/build.gradle b/code/common/service-discovery/build.gradle deleted file mode 100644 index db87eb95..00000000 --- a/code/common/service-discovery/build.gradle +++ /dev/null @@ -1,41 +0,0 @@ -plugins { - id 'java' - -} - -repositories { - mavenLocal() - mavenCentral() - - repositories { - mavenCentral() - maven { url 'https://jitpack.io' } - } -} - -java { - toolchain { - languageVersion.set(JavaLanguageVersion.of(21)) - } -} -apply from: "$rootProject.projectDir/srcsets.gradle" - -dependencies { - implementation libs.bundles.slf4j - - implementation libs.bundles.curator - implementation libs.guice - implementation libs.bundles.gson - implementation libs.bundles.mariadb - implementation libs.bundles.grpc - implementation libs.notnull - - testImplementation libs.bundles.slf4j.test - testImplementation libs.bundles.junit - testImplementation libs.mockito - - testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4') - testImplementation 'org.testcontainers:mariadb:1.17.4' - testImplementation 'org.testcontainers:junit-jupiter:1.17.4' - testImplementation project(':code:functions:math:api') -} diff --git a/code/common/service-discovery/readme.md b/code/common/service-discovery/readme.md deleted file mode 100644 index 366b3ba8..00000000 --- a/code/common/service-discovery/readme.md +++ /dev/null @@ -1,149 +0,0 @@ -# Service Discovery - -Contains classes for helping services discover each other, -and managing connections between them. - -## Service Registry - -The service registry is a class that keeps track of the services -that are currently running, and their connection information. - -The service register implementation is based on [Zookeeper](https://zookeeper.apache.org/), -which is a distributed coordination service. This lets services register -themselves and announce their liveness, and then discover each other. - -It supports multiple instances of a service running, and -supports running the system bare-metal, where it will assign -ports to the services from a range. - -* REST services are registered on a per-node basis, and are always non-partitioned. -* gRPC services are registered on a per-api basis, and can be partitioned - or non-partitioned. This means that if a gRPC api is moved between nodes, - the clients will not need to be reconfigured. - -To be discoverable, the caller must first register their -services: - -```java -// Register one or more services -serviceRegistry.registerService( - ServiceKey.forRest(serviceId, nodeId), - instanceUuid, // unique - externalAddress); // bind-address - -// Non-partitioned GRPC service -serviceRegistry.registerService( - ServiceKey.forServiceDescriptor(descriptor, ServicePartition.any()), - instanceUuid, - externalAddress); - -// Partitioned GRPC service -serviceRegistry.registerService( - ServiceKey.forServiceDescriptor(descriptor, ServicePartition.partition(5)), - instanceUuid, - externalAddress); - -// (+ any other services) -``` - -Then, the caller must announce their instance. Before this is done, -the service is not discoverable. - -```java -registry.announceInstance(instanceUUID); -``` - -All of this is done automatically by the `Service` base class -in the [service](../service/) module. - -To discover a service, the caller can query the registry: - -```java -Set endpoints = registry.getEndpoints(serviceKey); -``` - -It's also possible to subscribe to changes in the registry, so that -the caller can be notified when a service comes or goes, with `registry.registerMonitor()`. - -However the `GrpcChannelPoolFactory` is a more convenient way to access the services, -it will let the caller create a pool of channels to the services, and manage their -lifecycle, listen to lifecycle notifications and so on. - -## gRPC Channel Pool - -From the [GrpcChannelPoolFactory](java/nu/marginalia/service/client/GrpcChannelPoolFactory.java), two types of channel pools can be created -that are aware of the service registry: - -* [GrpcMultiNodeChannelPool](java/nu/marginalia/service/client/GrpcMultiNodeChannelPool.java) - This pool permits 1-n style communication with partitioned services -* [GrpcSingleNodeChannelPool](java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java) - This pool permits 1-1 style communication with non-partitioned services. - if multiple instances are running, it will use one of them and fall back - to another if the first is not available. - -The pools can generate calls to the gRPC services, and will manage the lifecycle of the channels. - -The API is designed to be simple to use, and will permit the caller to access the Stub interfaces -for the services through a fluent API. - -### Example Usage of the GrpcSingleNodeChannelPool - -```java -// create a pool for a non-partitioned service -channelPool = factory.createSingle( - ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.any()), - MathApiGrpc::newBlockingStub); - -// blocking call -Response response = channelPool - .call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup) - .run(request); - -// sequential blocking calls -List response = channelPool - .call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup) - .runFor(request1, request2); - - -// async call -Future response = channelPool - .call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup) - .async(myExecutor) - .run(request); - -// multiple async calls -Future> response = channelPool - .call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup) - .async(myExecutor) - .runFor(request1, request2); -``` - -### Example Usage of the GrpcSingleNodeChannelPool - -```java -// create a pool for a partitioned service -channelPool = factory.createMulti( - ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.multi()), - MathApiGrpc::newBlockingStub); - -// blocking call -List response = channelPool - .call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup) - .run(request); - -// async call -Future> response = channelPool - .call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup) - .async(myExecutor) - .runEach(request); - -// async call, will fail or succeed as a group -Future> response = channelPool - .call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup) - .async(myExecutor) - .runAll(request1, request2); -``` - - -### Central Classes - -* [ServiceRegistryIf](java/nu/marginalia/service/discovery/ServiceRegistryIf.java) -* [ZkServiceRegistry](java/nu/marginalia/service/discovery/ZkServiceRegistry.java) \ No newline at end of file diff --git a/code/common/service/build.gradle b/code/common/service/build.gradle index 75fab4ca..fa109b0e 100644 --- a/code/common/service/build.gradle +++ b/code/common/service/build.gradle @@ -12,7 +12,6 @@ java { apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { - implementation project(':code:common:service-discovery') implementation project(':code:libraries:message-queue') implementation project(':code:common:db') implementation project(':code:common:config') @@ -25,11 +24,18 @@ dependencies { implementation libs.bundles.prometheus implementation libs.bundles.slf4j implementation libs.bucket4j + implementation libs.notnull + implementation libs.bundles.curator implementation libs.bundles.flyway testImplementation libs.bundles.slf4j.test implementation libs.bundles.mariadb + testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4') + testImplementation 'org.testcontainers:mariadb:1.17.4' + testImplementation 'org.testcontainers:junit-jupiter:1.17.4' + testImplementation project(':code:libraries:test-helpers') + testImplementation libs.bundles.slf4j.test testImplementation libs.bundles.junit testImplementation libs.mockito diff --git a/code/common/service/java/nu/marginalia/service/MainClass.java b/code/common/service/java/nu/marginalia/service/MainClass.java index 18bedc51..2953b875 100644 --- a/code/common/service/java/nu/marginalia/service/MainClass.java +++ b/code/common/service/java/nu/marginalia/service/MainClass.java @@ -1,7 +1,6 @@ package nu.marginalia.service; import io.prometheus.client.hotspot.DefaultExports; -import nu.marginalia.service.id.ServiceId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/code/common/service-discovery/java/nu/marginalia/service/NodeConfigurationWatcher.java b/code/common/service/java/nu/marginalia/service/NodeConfigurationWatcher.java similarity index 100% rename from code/common/service-discovery/java/nu/marginalia/service/NodeConfigurationWatcher.java rename to code/common/service/java/nu/marginalia/service/NodeConfigurationWatcher.java diff --git a/code/common/service-discovery/java/nu/marginalia/service/id/ServiceId.java b/code/common/service/java/nu/marginalia/service/ServiceId.java similarity index 95% rename from code/common/service-discovery/java/nu/marginalia/service/id/ServiceId.java rename to code/common/service/java/nu/marginalia/service/ServiceId.java index 26f52b6b..e9fb960d 100644 --- a/code/common/service-discovery/java/nu/marginalia/service/id/ServiceId.java +++ b/code/common/service/java/nu/marginalia/service/ServiceId.java @@ -1,4 +1,4 @@ -package nu.marginalia.service.id; +package nu.marginalia.service; public enum ServiceId { diff --git a/code/common/service-discovery/java/nu/marginalia/service/ServiceMonitors.java b/code/common/service/java/nu/marginalia/service/ServiceMonitors.java similarity index 99% rename from code/common/service-discovery/java/nu/marginalia/service/ServiceMonitors.java rename to code/common/service/java/nu/marginalia/service/ServiceMonitors.java index b3b497ef..9b5276b2 100644 --- a/code/common/service-discovery/java/nu/marginalia/service/ServiceMonitors.java +++ b/code/common/service/java/nu/marginalia/service/ServiceMonitors.java @@ -3,7 +3,6 @@ package nu.marginalia.service; import com.google.inject.Inject; import com.google.inject.Singleton; import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.service.id.ServiceId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/code/common/service-discovery/java/nu/marginalia/service/client/GrpcChannelPoolFactory.java b/code/common/service/java/nu/marginalia/service/client/GrpcChannelPoolFactory.java similarity index 95% rename from code/common/service-discovery/java/nu/marginalia/service/client/GrpcChannelPoolFactory.java rename to code/common/service/java/nu/marginalia/service/client/GrpcChannelPoolFactory.java index 57e72fa6..a1bd19b1 100644 --- a/code/common/service-discovery/java/nu/marginalia/service/client/GrpcChannelPoolFactory.java +++ b/code/common/service/java/nu/marginalia/service/client/GrpcChannelPoolFactory.java @@ -4,7 +4,7 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; -import nu.marginalia.service.NamedExecutorFactory; +import nu.marginalia.util.NamedExecutorFactory; import nu.marginalia.service.NodeConfigurationWatcher; import nu.marginalia.service.discovery.ServiceRegistryIf; import nu.marginalia.service.discovery.property.PartitionTraits; @@ -57,6 +57,7 @@ public class GrpcChannelPoolFactory { .forAddress(route.host(), route.port()) .executor(executor) .offloadExecutor(offloadExecutor) + .idleTimeout(120, java.util.concurrent.TimeUnit.SECONDS) .usePlaintext() .build(); diff --git a/code/common/service-discovery/java/nu/marginalia/service/client/GrpcMultiNodeChannelPool.java b/code/common/service/java/nu/marginalia/service/client/GrpcMultiNodeChannelPool.java similarity index 100% rename from code/common/service-discovery/java/nu/marginalia/service/client/GrpcMultiNodeChannelPool.java rename to code/common/service/java/nu/marginalia/service/client/GrpcMultiNodeChannelPool.java diff --git a/code/common/service-discovery/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java b/code/common/service/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java similarity index 96% rename from code/common/service-discovery/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java rename to code/common/service/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java index cc2b5f91..a002e529 100644 --- a/code/common/service-discovery/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java +++ b/code/common/service/java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java @@ -74,6 +74,14 @@ public class GrpcSingleNodeChannelPool extends ServiceChangeMonitor { return true; } + // Mostly for testing + public synchronized void stop() { + for (var channel : channels.values()) { + channel.closeHard(); + } + channels.clear(); + } + private class ConnectionHolder implements Comparable { private final AtomicReference channel = new AtomicReference<>(); private final InstanceAddress address; @@ -116,6 +124,12 @@ public class GrpcSingleNodeChannelPool extends ServiceChangeMonitor { mc.shutdown(); } } + public void closeHard() { + ManagedChannel mc = channel.getAndSet(null); + if (mc != null) { + mc.shutdownNow(); + } + } @Override public boolean equals(Object o) { diff --git a/code/common/service-discovery/java/nu/marginalia/service/client/ServiceNotAvailableException.java b/code/common/service/java/nu/marginalia/service/client/ServiceNotAvailableException.java similarity index 100% rename from code/common/service-discovery/java/nu/marginalia/service/client/ServiceNotAvailableException.java rename to code/common/service/java/nu/marginalia/service/client/ServiceNotAvailableException.java diff --git a/code/common/service-discovery/java/nu/marginalia/service/discovery/ServiceRegistryIf.java b/code/common/service/java/nu/marginalia/service/discovery/ServiceRegistryIf.java similarity index 100% rename from code/common/service-discovery/java/nu/marginalia/service/discovery/ServiceRegistryIf.java rename to code/common/service/java/nu/marginalia/service/discovery/ServiceRegistryIf.java diff --git a/code/common/service-discovery/java/nu/marginalia/service/discovery/ZkServiceRegistry.java b/code/common/service/java/nu/marginalia/service/discovery/ZkServiceRegistry.java similarity index 100% rename from code/common/service-discovery/java/nu/marginalia/service/discovery/ZkServiceRegistry.java rename to code/common/service/java/nu/marginalia/service/discovery/ZkServiceRegistry.java diff --git a/code/common/service-discovery/java/nu/marginalia/service/discovery/monitor/ServiceChangeMonitor.java b/code/common/service/java/nu/marginalia/service/discovery/monitor/ServiceChangeMonitor.java similarity index 100% rename from code/common/service-discovery/java/nu/marginalia/service/discovery/monitor/ServiceChangeMonitor.java rename to code/common/service/java/nu/marginalia/service/discovery/monitor/ServiceChangeMonitor.java diff --git a/code/common/service-discovery/java/nu/marginalia/service/discovery/monitor/ServiceMonitorIf.java b/code/common/service/java/nu/marginalia/service/discovery/monitor/ServiceMonitorIf.java similarity index 100% rename from code/common/service-discovery/java/nu/marginalia/service/discovery/monitor/ServiceMonitorIf.java rename to code/common/service/java/nu/marginalia/service/discovery/monitor/ServiceMonitorIf.java diff --git a/code/common/service-discovery/java/nu/marginalia/service/discovery/property/PartitionTraits.java b/code/common/service/java/nu/marginalia/service/discovery/property/PartitionTraits.java similarity index 100% rename from code/common/service-discovery/java/nu/marginalia/service/discovery/property/PartitionTraits.java rename to code/common/service/java/nu/marginalia/service/discovery/property/PartitionTraits.java diff --git a/code/common/service-discovery/java/nu/marginalia/service/discovery/property/ServiceEndpoint.java b/code/common/service/java/nu/marginalia/service/discovery/property/ServiceEndpoint.java similarity index 100% rename from code/common/service-discovery/java/nu/marginalia/service/discovery/property/ServiceEndpoint.java rename to code/common/service/java/nu/marginalia/service/discovery/property/ServiceEndpoint.java diff --git a/code/common/service-discovery/java/nu/marginalia/service/discovery/property/ServiceKey.java b/code/common/service/java/nu/marginalia/service/discovery/property/ServiceKey.java similarity index 98% rename from code/common/service-discovery/java/nu/marginalia/service/discovery/property/ServiceKey.java rename to code/common/service/java/nu/marginalia/service/discovery/property/ServiceKey.java index 66ae5ded..5f2b3a40 100644 --- a/code/common/service-discovery/java/nu/marginalia/service/discovery/property/ServiceKey.java +++ b/code/common/service/java/nu/marginalia/service/discovery/property/ServiceKey.java @@ -1,7 +1,7 @@ package nu.marginalia.service.discovery.property; import io.grpc.ServiceDescriptor; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.ServiceId; public sealed interface ServiceKey

{ String toPath(); diff --git a/code/common/service-discovery/java/nu/marginalia/service/discovery/property/ServicePartition.java b/code/common/service/java/nu/marginalia/service/discovery/property/ServicePartition.java similarity index 100% rename from code/common/service-discovery/java/nu/marginalia/service/discovery/property/ServicePartition.java rename to code/common/service/java/nu/marginalia/service/discovery/property/ServicePartition.java diff --git a/code/common/service/java/nu/marginalia/service/module/ServiceConfiguration.java b/code/common/service/java/nu/marginalia/service/module/ServiceConfiguration.java index e64c88a8..db3ef975 100644 --- a/code/common/service/java/nu/marginalia/service/module/ServiceConfiguration.java +++ b/code/common/service/java/nu/marginalia/service/module/ServiceConfiguration.java @@ -1,6 +1,6 @@ package nu.marginalia.service.module; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.ServiceId; import java.util.UUID; diff --git a/code/common/service/java/nu/marginalia/service/module/ServiceConfigurationModule.java b/code/common/service/java/nu/marginalia/service/module/ServiceConfigurationModule.java index 8ed7f45c..157f6673 100644 --- a/code/common/service/java/nu/marginalia/service/module/ServiceConfigurationModule.java +++ b/code/common/service/java/nu/marginalia/service/module/ServiceConfigurationModule.java @@ -2,7 +2,7 @@ package nu.marginalia.service.module; import com.google.inject.AbstractModule; import com.google.inject.name.Names; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.ServiceId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/code/common/service-discovery/java/nu/marginalia/service/ServiceDiscoveryModule.java b/code/common/service/java/nu/marginalia/service/module/ServiceDiscoveryModule.java similarity index 96% rename from code/common/service-discovery/java/nu/marginalia/service/ServiceDiscoveryModule.java rename to code/common/service/java/nu/marginalia/service/module/ServiceDiscoveryModule.java index 6daa084c..9863a069 100644 --- a/code/common/service-discovery/java/nu/marginalia/service/ServiceDiscoveryModule.java +++ b/code/common/service/java/nu/marginalia/service/module/ServiceDiscoveryModule.java @@ -1,4 +1,4 @@ -package nu.marginalia.service; +package nu.marginalia.service.module; import com.google.inject.AbstractModule; import nu.marginalia.service.discovery.ServiceRegistryIf; @@ -9,8 +9,6 @@ import org.apache.curator.retry.ExponentialBackoffRetry; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Optional; - /** Provides a Guice module for service discovery. */ public class ServiceDiscoveryModule extends AbstractModule { diff --git a/code/common/service/java/nu/marginalia/service/server/GrpcServer.java b/code/common/service/java/nu/marginalia/service/server/GrpcServer.java new file mode 100644 index 00000000..938538b8 --- /dev/null +++ b/code/common/service/java/nu/marginalia/service/server/GrpcServer.java @@ -0,0 +1,59 @@ +package nu.marginalia.service.server; + +import io.grpc.BindableService; +import io.grpc.Server; +import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder; +import io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoopGroup; +import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel; +import nu.marginalia.service.discovery.ServiceRegistryIf; +import nu.marginalia.service.discovery.property.ServiceKey; +import nu.marginalia.service.discovery.property.ServicePartition; +import nu.marginalia.service.module.ServiceConfiguration; +import nu.marginalia.util.NamedExecutorFactory; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.List; + +public class GrpcServer { + private final Server server; + public GrpcServer(ServiceConfiguration config, + ServiceRegistryIf serviceRegistry, + ServicePartition partition, + List grpcServices) throws Exception { + + int port = serviceRegistry.requestPort(config.externalAddress(), new ServiceKey.Grpc<>("-", partition)); + + int nThreads = Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 16); + + // Start the gRPC server + var grpcServerBuilder = NettyServerBuilder.forAddress(new InetSocketAddress(config.bindAddress(), port)) + .executor(NamedExecutorFactory.createFixed("nettyExecutor", nThreads)) + .workerEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Worker-ELG", nThreads))) + .bossEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Boss-ELG", nThreads))) + .channelType(NioServerSocketChannel.class); + + for (var grpcService : grpcServices) { + var svc = grpcService.bindService(); + + serviceRegistry.registerService( + ServiceKey.forServiceDescriptor(svc.getServiceDescriptor(), partition), + config.instanceUuid(), + config.externalAddress() + ); + + grpcServerBuilder.addService(svc); + } + server = grpcServerBuilder.build(); + } + + public void start() throws IOException { + server.start(); + } + + public void stop() throws InterruptedException { + server.shutdownNow(); + server.awaitTermination(); + } + +} diff --git a/code/common/service/java/nu/marginalia/service/server/Service.java b/code/common/service/java/nu/marginalia/service/server/Service.java index 040e9258..283ed439 100644 --- a/code/common/service/java/nu/marginalia/service/server/Service.java +++ b/code/common/service/java/nu/marginalia/service/server/Service.java @@ -7,10 +7,10 @@ import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel; import io.prometheus.client.Counter; import lombok.SneakyThrows; import nu.marginalia.mq.inbox.*; -import nu.marginalia.service.NamedExecutorFactory; +import nu.marginalia.util.NamedExecutorFactory; import nu.marginalia.service.client.ServiceNotAvailableException; import nu.marginalia.service.discovery.property.*; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.ServiceId; import nu.marginalia.service.server.mq.ServiceMqSubscription; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,11 +23,6 @@ import spark.Spark; import java.net.InetSocketAddress; import java.util.List; import java.util.Optional; -import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; public class Service { private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -54,6 +49,7 @@ public class Service { protected final MqInboxIf messageQueueInbox; private final int node; + private GrpcServer grpcServer; @SneakyThrows public Service(BaseServiceParams params, @@ -134,30 +130,8 @@ public class Service { Spark.get("/internal/started", this::isInitialized); Spark.get("/internal/ready", this::isReady); - int port = params.serviceRegistry.requestPort(config.externalAddress(), new ServiceKey.Grpc<>("-", partition)); - - - int nThreads = Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 16); - - // Start the gRPC server - var grpcServerBuilder = NettyServerBuilder.forAddress(new InetSocketAddress(config.bindAddress(), port)) - .executor(NamedExecutorFactory.createFixed("nettyExecutor", nThreads)) - .workerEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Worker-ELG", nThreads))) - .bossEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Boss-ELG", nThreads))) - .channelType(NioServerSocketChannel.class); - - for (var grpcService : grpcServices) { - var svc = grpcService.bindService(); - - params.serviceRegistry.registerService( - ServiceKey.forServiceDescriptor(svc.getServiceDescriptor(), partition), - config.instanceUuid(), - config.externalAddress() - ); - - grpcServerBuilder.addService(svc); - } - grpcServerBuilder.build().start(); + grpcServer = new GrpcServer(config, serviceRegistry, partition, grpcServices); + grpcServer.start(); } } diff --git a/code/common/service-discovery/java/nu/marginalia/service/NamedExecutorFactory.java b/code/common/service/java/nu/marginalia/util/NamedExecutorFactory.java similarity index 97% rename from code/common/service-discovery/java/nu/marginalia/service/NamedExecutorFactory.java rename to code/common/service/java/nu/marginalia/util/NamedExecutorFactory.java index 0cc84226..fc5c07f9 100644 --- a/code/common/service-discovery/java/nu/marginalia/service/NamedExecutorFactory.java +++ b/code/common/service/java/nu/marginalia/util/NamedExecutorFactory.java @@ -1,4 +1,4 @@ -package nu.marginalia.service; +package nu.marginalia.util; import org.jetbrains.annotations.NotNull; diff --git a/code/common/service-discovery/java/nu/marginalia/service/ServiceHomeNotConfiguredException.java b/code/common/service/java/nu/marginalia/util/ServiceHomeNotConfiguredException.java similarity index 84% rename from code/common/service-discovery/java/nu/marginalia/service/ServiceHomeNotConfiguredException.java rename to code/common/service/java/nu/marginalia/util/ServiceHomeNotConfiguredException.java index 504abdc0..d858f9ab 100644 --- a/code/common/service-discovery/java/nu/marginalia/service/ServiceHomeNotConfiguredException.java +++ b/code/common/service/java/nu/marginalia/util/ServiceHomeNotConfiguredException.java @@ -1,4 +1,4 @@ -package nu.marginalia.service; +package nu.marginalia.util; public class ServiceHomeNotConfiguredException extends RuntimeException { public ServiceHomeNotConfiguredException(String message) { diff --git a/code/common/service/readme.md b/code/common/service/readme.md index 79d0a613..5dc54cfa 100644 --- a/code/common/service/readme.md +++ b/code/common/service/readme.md @@ -55,4 +55,156 @@ The service should also be given a canonical name in the `ServiceId` enum. ## Central Classes * [MainClass](java/nu/marginalia/service/MainClass.java) bootstraps all executables -* [Service](java/nu/marginalia/service/server/Service.java) base class for all services. \ No newline at end of file +* [Service](java/nu/marginalia/service/server/Service.java) base class for all services. + +--- + +# Service Discovery + +The module also contains classes for helping services discover each other, +and managing connections between them. + +## Service Registry + +The service registry is a class that keeps track of the services +that are currently running, and their connection information. + +The service register implementation is based on [Zookeeper](https://zookeeper.apache.org/), +which is a distributed coordination service. This lets services register +themselves and announce their liveness, and then discover each other. + +It supports multiple instances of a service running, and +supports running the system bare-metal, where it will assign +ports to the services from a range. + +* REST services are registered on a per-node basis, and are always non-partitioned. +* gRPC services are registered on a per-api basis, and can be partitioned + or non-partitioned. This means that if a gRPC api is moved between nodes, + the clients will not need to be reconfigured. + +To be discoverable, the caller must first register their +services: + +```java +// Register one or more services +serviceRegistry.registerService( + ServiceKey.forRest(serviceId, nodeId), + instanceUuid, // unique + externalAddress); // bind-address + +// Non-partitioned GRPC service +serviceRegistry.registerService( + ServiceKey.forServiceDescriptor(descriptor, ServicePartition.any()), + instanceUuid, + externalAddress); + +// Partitioned GRPC service +serviceRegistry.registerService( + ServiceKey.forServiceDescriptor(descriptor, ServicePartition.partition(5)), + instanceUuid, + externalAddress); + +// (+ any other services) +``` + +Then, the caller must announce their instance. Before this is done, +the service is not discoverable. + +```java +registry.announceInstance(instanceUUID); +``` + +All of this is done automatically by the `Service` base class +in the [service](../service/) module. + +To discover a service, the caller can query the registry: + +```java +Set endpoints = registry.getEndpoints(serviceKey); +``` + +It's also possible to subscribe to changes in the registry, so that +the caller can be notified when a service comes or goes, with `registry.registerMonitor()`. + +However the `GrpcChannelPoolFactory` is a more convenient way to access the services, +it will let the caller create a pool of channels to the services, and manage their +lifecycle, listen to lifecycle notifications and so on. + +## gRPC Channel Pool + +From the [GrpcChannelPoolFactory](java/nu/marginalia/service/client/GrpcChannelPoolFactory.java), two types of channel pools can be created +that are aware of the service registry: + +* [GrpcMultiNodeChannelPool](java/nu/marginalia/service/client/GrpcMultiNodeChannelPool.java) - This pool permits 1-n style communication with partitioned services +* [GrpcSingleNodeChannelPool](java/nu/marginalia/service/client/GrpcSingleNodeChannelPool.java) - This pool permits 1-1 style communication with non-partitioned services. + if multiple instances are running, it will use one of them and fall back + to another if the first is not available. + +The pools can generate calls to the gRPC services, and will manage the lifecycle of the channels. + +The API is designed to be simple to use, and will permit the caller to access the Stub interfaces +for the services through a fluent API. + +### Example Usage of the GrpcSingleNodeChannelPool + +```java +// create a pool for a non-partitioned service +channelPool = factory.createSingle( + ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.any()), + MathApiGrpc::newBlockingStub); + +// blocking call +Response response = channelPool + .call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup) + .run(request); + +// sequential blocking calls +List response = channelPool + .call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup) + .runFor(request1, request2); + + +// async call +Future response = channelPool + .call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup) + .async(myExecutor) + .run(request); + +// multiple async calls +Future> response = channelPool + .call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup) + .async(myExecutor) + .runFor(request1, request2); +``` + +### Example Usage of the GrpcSingleNodeChannelPool + +```java +// create a pool for a partitioned service +channelPool = factory.createMulti( + ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.multi()), + MathApiGrpc::newBlockingStub); + +// blocking call +List response = channelPool + .call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup) + .run(request); + +// async call +Future> response = channelPool + .call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup) + .async(myExecutor) + .runEach(request); + +// async call, will fail or succeed as a group +Future> response = channelPool + .call(MathApiGrpc.MathApiBlockingStub::dictionaryLookup) + .async(myExecutor) + .runAll(request1, request2); +``` + + +### Central Classes + +* [ServiceRegistryIf](java/nu/marginalia/service/discovery/ServiceRegistryIf.java) +* [ZkServiceRegistry](java/nu/marginalia/service/discovery/ZkServiceRegistry.java) \ No newline at end of file diff --git a/code/common/service-discovery/test/nu/marginalia/service/discovery/ZkServiceRegistryTest.java b/code/common/service/test/nu/marginalia/service/discovery/ZkServiceRegistryTest.java similarity index 94% rename from code/common/service-discovery/test/nu/marginalia/service/discovery/ZkServiceRegistryTest.java rename to code/common/service/test/nu/marginalia/service/discovery/ZkServiceRegistryTest.java index 3ea107e1..bd0adff7 100644 --- a/code/common/service-discovery/test/nu/marginalia/service/discovery/ZkServiceRegistryTest.java +++ b/code/common/service/test/nu/marginalia/service/discovery/ZkServiceRegistryTest.java @@ -1,9 +1,9 @@ package nu.marginalia.service.discovery; -import nu.marginalia.api.math.MathApiGrpc; import nu.marginalia.service.discovery.property.ServiceKey; import nu.marginalia.service.discovery.property.ServicePartition; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.ServiceId; +import nu.marginalia.test.TestApiGrpc; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.junit.jupiter.api.*; @@ -94,7 +94,7 @@ class ZkServiceRegistryTest { var registry2 = createRegistry(); var key1 = ServiceKey.forRest(ServiceId.Search, 0); - var key2 = ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.any()); + var key2 = ServiceKey.forGrpcApi(TestApiGrpc.class, ServicePartition.any()); var endpoint1 = registry1.registerService(key1, uuid1, "127.0.0.1"); var endpoint2 = registry2.registerService(key2, uuid2, "127.0.0.2"); @@ -123,7 +123,7 @@ class ZkServiceRegistryTest { var registry1 = createRegistry(); var registry2 = createRegistry(); - var key = ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.any()); + var key = ServiceKey.forGrpcApi(TestApiGrpc.class, ServicePartition.any()); var endpoint1 = registry1.registerService(key, uuid1, "127.0.0.1"); var endpoint2 = registry2.registerService(key, uuid2, "127.0.0.2"); @@ -149,8 +149,8 @@ class ZkServiceRegistryTest { var registry1 = createRegistry(); var registry2 = createRegistry(); - var key1 = ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.partition(1)); - var key2 = ServiceKey.forGrpcApi(MathApiGrpc.class, ServicePartition.partition(2)); + var key1 = ServiceKey.forGrpcApi(TestApiGrpc.class, ServicePartition.partition(1)); + var key2 = ServiceKey.forGrpcApi(TestApiGrpc.class, ServicePartition.partition(2)); var endpoint1 = registry1.registerService(key1, uuid1, "127.0.0.1"); var endpoint2 = registry2.registerService(key2, uuid2, "127.0.0.2"); diff --git a/code/common/service/test/nu/marginalia/service/server/GrpcServerTest.java b/code/common/service/test/nu/marginalia/service/server/GrpcServerTest.java new file mode 100644 index 00000000..2ef03417 --- /dev/null +++ b/code/common/service/test/nu/marginalia/service/server/GrpcServerTest.java @@ -0,0 +1,160 @@ +package nu.marginalia.service.server; + +import io.grpc.BindableService; +import io.grpc.stub.StreamObserver; +import nu.marginalia.service.ServiceId; +import nu.marginalia.service.client.GrpcChannelPoolFactory; +import nu.marginalia.service.client.GrpcSingleNodeChannelPool; +import nu.marginalia.service.discovery.ServiceRegistryIf; +import nu.marginalia.service.discovery.property.ServiceEndpoint; +import nu.marginalia.service.discovery.property.ServiceKey; +import nu.marginalia.service.discovery.property.ServicePartition; +import nu.marginalia.service.module.ServiceConfiguration; +import nu.marginalia.test.RpcInteger; +import nu.marginalia.test.TestApiGrpc; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.*; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + + +public class GrpcServerTest { + + List servers = new ArrayList<>(); + List> clients = new ArrayList<>(); + + @AfterEach + public void close() { + for (var server : servers) { + try { + server.stop(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + for (var client : clients) { + client.stop(); + } + servers.clear(); + clients.clear(); + } + + @Test + public void testStartStopServer() throws Exception { + var server = createServerOnPort(randomHighPort(), UUID.randomUUID(), new TestGrpcService()); + + server.start(); + + server.stop(); + } + + @Test + public void testClient() throws Exception { + int port = randomHighPort(); + UUID serverUUID = UUID.randomUUID(); + + var server = createServerOnPort(port, serverUUID, new TestGrpcService()); + server.start(); + + var mockRegistry = Mockito.mock(ServiceRegistryIf.class); + when(mockRegistry.getEndpoints(any())).thenReturn( + Set.of(new ServiceEndpoint("127.0.0.1", port).asInstance(serverUUID))); + + var client = createClient(mockRegistry); + client.onChange(); + + var ret = client.call(TestApiGrpc.TestApiBlockingStub::increment) + .run(RpcInteger.newBuilder().setValue(1).build()); + Assertions.assertEquals(2, ret.getValue()); + } + + @Test + public void testClientConnSwitch() throws Exception { + int port = randomHighPort(); + UUID serverUUID1 = UUID.randomUUID(); + UUID serverUUID2 = UUID.randomUUID(); + + var server1 = createServerOnPort(port, serverUUID1, new TestGrpcService()); + + server1.start(); + + Set endpoints = new HashSet<>(); + endpoints.add(new ServiceEndpoint("127.0.0.1", port).asInstance(serverUUID1)); + + var mockRegistry = Mockito.mock(ServiceRegistryIf.class); + when(mockRegistry.getEndpoints(any())).thenReturn(endpoints); + + var client = createClient(mockRegistry); + + var ret = client.call(TestApiGrpc.TestApiBlockingStub::increment) + .run(RpcInteger.newBuilder().setValue(1).build()); + System.out.println(ret); + + server1.stop(); + var server2 = createServerOnPort(port + 1, serverUUID2, new TestGrpcService()); + endpoints.add(new ServiceEndpoint("127.0.0.1", port + 1).asInstance(serverUUID2)); + server2.start(); + client.onChange(); + + ret = client.call(TestApiGrpc.TestApiBlockingStub::increment) + .run(RpcInteger.newBuilder().setValue(1).build()); + System.out.println(ret); + + endpoints.clear(); + endpoints.add(new ServiceEndpoint("127.0.0.1", port + 1).asInstance(serverUUID2)); + client.onChange(); + + ret = client.call(TestApiGrpc.TestApiBlockingStub::increment) + .run(RpcInteger.newBuilder().setValue(1).build()); + System.out.println(ret); + + } + + private GrpcServer createServerOnPort(int port, UUID uuid, BindableService... services) throws Exception { + var mockRegistry = Mockito.mock(ServiceRegistryIf.class); + when(mockRegistry.requestPort(any(), any())).thenReturn(port); + + var config = new ServiceConfiguration(ServiceId.Api, 1, + "127.0.0.1", "127.0.0.1", -1, uuid); + + var server = new GrpcServer(config, mockRegistry, ServicePartition.any(), List.of(services)); + servers.add(server); + return server; + } + + private GrpcSingleNodeChannelPool createClient(ServiceRegistryIf mockRegistry) { + var client = new GrpcChannelPoolFactory(null, mockRegistry).createSingle( + ServiceKey.forGrpcApi(TestApiGrpc.class, ServicePartition.any()), + TestApiGrpc::newBlockingStub); + clients.add(client); + return client; + } + + private int randomHighPort() { + return 12000 + (int) (Math.random() * 1000); + } + + private static class TestGrpcService extends TestApiGrpc.TestApiImplBase { + + @Override + public void increment(RpcInteger request, StreamObserver obs) { + obs.onNext(RpcInteger.newBuilder().setValue(request.getValue() + 1).build()); + obs.onCompleted(); + } + + @Override + public void count(RpcInteger request, StreamObserver obs) { + for (int i = 0; i < request.getValue(); i++) { + obs.onNext(RpcInteger.newBuilder().setValue(i).build()); + } + obs.onCompleted(); + } + } + + +} diff --git a/code/execution/api/build.gradle b/code/execution/api/build.gradle index b31d87d3..5102b613 100644 --- a/code/execution/api/build.gradle +++ b/code/execution/api/build.gradle @@ -30,18 +30,17 @@ dependencies { implementation project(':code:common:model') implementation project(':code:index:api') implementation project(':code:common:config') + implementation project(':code:common:service') implementation project(':code:common:db') implementation project(':code:libraries:message-queue') - implementation project(':code:common:service-discovery') implementation libs.bundles.slf4j implementation libs.prometheus implementation libs.notnull implementation libs.guice - implementation libs.protobuf + implementation libs.bundles.protobuf implementation libs.bundles.grpc - implementation libs.javax.annotation implementation libs.gson testImplementation libs.bundles.slf4j.test diff --git a/code/execution/api/java/nu/marginalia/executor/client/ExecutorClient.java b/code/execution/api/java/nu/marginalia/executor/client/ExecutorClient.java index 91f890fa..c09bdc27 100644 --- a/code/execution/api/java/nu/marginalia/executor/client/ExecutorClient.java +++ b/code/execution/api/java/nu/marginalia/executor/client/ExecutorClient.java @@ -14,7 +14,7 @@ import nu.marginalia.service.client.GrpcMultiNodeChannelPool; import nu.marginalia.service.discovery.ServiceRegistryIf; import nu.marginalia.service.discovery.property.ServiceKey; import nu.marginalia.service.discovery.property.ServicePartition; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.ServiceId; import nu.marginalia.storage.model.FileStorageId; import org.slf4j.Logger; diff --git a/code/execution/build.gradle b/code/execution/build.gradle index a3cec39f..8c8fd70b 100644 --- a/code/execution/build.gradle +++ b/code/execution/build.gradle @@ -28,7 +28,6 @@ dependencies { implementation project(':code:common:linkdb') implementation project(':code:common:service') - implementation project(':code:common:service-discovery') implementation project(':third-party:commons-codec') @@ -57,7 +56,6 @@ dependencies { implementation libs.notnull implementation libs.guice implementation libs.trove - implementation libs.protobuf implementation libs.zstd implementation libs.jsoup implementation libs.commons.io diff --git a/code/functions/domain-info/api/build.gradle b/code/functions/domain-info/api/build.gradle index c99c4fbf..0c4264ec 100644 --- a/code/functions/domain-info/api/build.gradle +++ b/code/functions/domain-info/api/build.gradle @@ -19,7 +19,7 @@ apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { implementation project(':code:common:model') implementation project(':code:common:config') - implementation project(':code:common:service-discovery') + implementation project(':code:common:service') implementation libs.bundles.slf4j @@ -27,8 +27,7 @@ dependencies { implementation libs.notnull implementation libs.guice implementation libs.gson - implementation libs.protobuf - implementation libs.javax.annotation + implementation libs.bundles.protobuf implementation libs.bundles.grpc testImplementation libs.bundles.slf4j.test diff --git a/code/functions/domain-info/build.gradle b/code/functions/domain-info/build.gradle index 4858e935..afb2b358 100644 --- a/code/functions/domain-info/build.gradle +++ b/code/functions/domain-info/build.gradle @@ -21,7 +21,6 @@ dependencies { implementation project(':code:common:service') implementation project(':code:common:model') implementation project(':code:common:db') - implementation project(':code:common:service-discovery') implementation project(':code:libraries:geo-ip') diff --git a/code/functions/link-graph/aggregate/build.gradle b/code/functions/link-graph/aggregate/build.gradle index 52be585f..41f89bef 100644 --- a/code/functions/link-graph/aggregate/build.gradle +++ b/code/functions/link-graph/aggregate/build.gradle @@ -19,7 +19,6 @@ dependencies { implementation project(':code:common:config') implementation project(':code:common:service') implementation project(':code:common:model') - implementation project(':code:common:service-discovery') implementation libs.bundles.slf4j diff --git a/code/functions/link-graph/api/build.gradle b/code/functions/link-graph/api/build.gradle index ffeab7f8..99dd4a36 100644 --- a/code/functions/link-graph/api/build.gradle +++ b/code/functions/link-graph/api/build.gradle @@ -19,7 +19,7 @@ apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { implementation project(':code:common:model') implementation project(':code:common:config') - implementation project(':code:common:service-discovery') + implementation project(':code:common:service') implementation libs.bundles.slf4j @@ -27,9 +27,8 @@ dependencies { implementation libs.notnull implementation libs.guice implementation libs.gson - implementation libs.protobuf + implementation libs.bundles.protobuf implementation libs.roaringbitmap - implementation libs.javax.annotation implementation libs.bundles.grpc testImplementation libs.bundles.slf4j.test diff --git a/code/functions/link-graph/partition/build.gradle b/code/functions/link-graph/partition/build.gradle index 43a2e654..faca528f 100644 --- a/code/functions/link-graph/partition/build.gradle +++ b/code/functions/link-graph/partition/build.gradle @@ -21,7 +21,6 @@ dependencies { implementation project(':code:common:model') implementation project(':code:common:linkdb') implementation project(':code:common:db') - implementation project(':code:common:service-discovery') implementation libs.bundles.slf4j diff --git a/code/functions/math/api/build.gradle b/code/functions/math/api/build.gradle index 779c4508..6811a06d 100644 --- a/code/functions/math/api/build.gradle +++ b/code/functions/math/api/build.gradle @@ -19,7 +19,7 @@ apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { implementation project(':code:common:model') implementation project(':code:common:config') - implementation project(':code:common:service-discovery') + implementation project(':code:common:service') implementation libs.bundles.slf4j @@ -27,8 +27,7 @@ dependencies { implementation libs.notnull implementation libs.guice implementation libs.gson - implementation libs.protobuf - implementation libs.javax.annotation + implementation libs.bundles.protobuf implementation libs.bundles.grpc testImplementation libs.bundles.slf4j.test diff --git a/code/functions/search-query/api/build.gradle b/code/functions/search-query/api/build.gradle index 8da48164..5339cbf4 100644 --- a/code/functions/search-query/api/build.gradle +++ b/code/functions/search-query/api/build.gradle @@ -21,8 +21,8 @@ apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { implementation project(':code:common:model') implementation project(':code:common:config') + implementation project(':code:common:service') implementation project(':code:index:query') - implementation project(':code:common:service-discovery') implementation libs.bundles.slf4j @@ -30,8 +30,7 @@ dependencies { implementation libs.notnull implementation libs.guice implementation libs.gson - implementation libs.protobuf - implementation libs.javax.annotation + implementation libs.bundles.protobuf implementation libs.bundles.grpc implementation libs.fastutil diff --git a/code/functions/search-query/build.gradle b/code/functions/search-query/build.gradle index 14b662f9..86cafefa 100644 --- a/code/functions/search-query/build.gradle +++ b/code/functions/search-query/build.gradle @@ -17,7 +17,6 @@ dependencies { implementation project(':code:common:db') implementation project(':code:common:config') implementation project(':code:common:model') - implementation project(':code:common:service-discovery') implementation project(':code:functions:search-query:api') diff --git a/code/index/api/build.gradle b/code/index/api/build.gradle index 895f75b0..50b3d726 100644 --- a/code/index/api/build.gradle +++ b/code/index/api/build.gradle @@ -14,7 +14,7 @@ apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { implementation project(':code:common:model') implementation project(':code:common:config') - implementation project(':code:common:service-discovery') + implementation project(':code:common:service') implementation project(':code:libraries:message-queue') implementation project(':code:functions:search-query:api') @@ -23,7 +23,7 @@ dependencies { implementation libs.prometheus implementation libs.notnull implementation libs.guice - implementation libs.protobuf + implementation libs.bundles.protobuf implementation libs.fastutil implementation libs.javax.annotation implementation libs.bundles.gson diff --git a/code/index/api/java/nu/marginalia/index/api/IndexMqClient.java b/code/index/api/java/nu/marginalia/index/api/IndexMqClient.java index 27465f6e..c8fcce7f 100644 --- a/code/index/api/java/nu/marginalia/index/api/IndexMqClient.java +++ b/code/index/api/java/nu/marginalia/index/api/IndexMqClient.java @@ -5,7 +5,7 @@ import com.google.inject.Singleton; import com.google.inject.name.Named; import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.outbox.MqOutbox; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.ServiceId; import java.util.UUID; diff --git a/code/index/build.gradle b/code/index/build.gradle index 7d52facc..7d34bab4 100644 --- a/code/index/build.gradle +++ b/code/index/build.gradle @@ -27,7 +27,6 @@ dependencies { implementation project(':code:common:model') implementation project(':code:common:linkdb') implementation project(':code:common:service') - implementation project(':code:common:service-discovery') implementation project(':code:functions:search-query:api') diff --git a/code/index/test/nu/marginalia/index/IndexQueryServiceIntegrationTestModule.java b/code/index/test/nu/marginalia/index/IndexQueryServiceIntegrationTestModule.java index d04b458c..e61c42d7 100644 --- a/code/index/test/nu/marginalia/index/IndexQueryServiceIntegrationTestModule.java +++ b/code/index/test/nu/marginalia/index/IndexQueryServiceIntegrationTestModule.java @@ -15,7 +15,7 @@ import nu.marginalia.process.control.FakeProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.index.domainrankings.DomainRankings; import nu.marginalia.service.control.*; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.ServiceId; import nu.marginalia.service.module.ServiceConfiguration; import org.mockito.Mockito; diff --git a/code/libraries/test-helpers/build.gradle b/code/libraries/test-helpers/build.gradle index 16244d0a..0066220f 100644 --- a/code/libraries/test-helpers/build.gradle +++ b/code/libraries/test-helpers/build.gradle @@ -1,5 +1,7 @@ plugins { id 'java' + + id "com.google.protobuf" version "0.9.4" } java { @@ -8,9 +10,12 @@ java { } } +apply from: "$rootProject.projectDir/protobuf.gradle" apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { + implementation libs.bundles.protobuf + implementation libs.bundles.grpc implementation libs.bundles.slf4j implementation libs.bundles.mariadb implementation libs.bundles.slf4j.test diff --git a/code/libraries/test-helpers/src/main/protobuf/testapi.proto b/code/libraries/test-helpers/src/main/protobuf/testapi.proto new file mode 100644 index 00000000..1029e868 --- /dev/null +++ b/code/libraries/test-helpers/src/main/protobuf/testapi.proto @@ -0,0 +1,15 @@ +syntax="proto3"; +package nu.marginalia.test; + +option java_package="nu.marginalia.test"; +option java_multiple_files=true; + +/* Dummy API for testing gRPC, service discovery, similar things */ +service TestApi { + rpc increment(RpcInteger) returns (RpcInteger) {} + rpc count(RpcInteger) returns (stream RpcInteger) {} +} + +message RpcInteger { + int32 value = 1; +} diff --git a/code/process-models/crawling-model/build.gradle b/code/process-models/crawling-model/build.gradle index 505db80f..9b846502 100644 --- a/code/process-models/crawling-model/build.gradle +++ b/code/process-models/crawling-model/build.gradle @@ -21,7 +21,6 @@ dependencies { implementation project(':code:common:process') implementation project(':code:libraries:big-string') implementation project(':code:index:api') - implementation project(':code:common:service-discovery') implementation project(':code:features-crawl:content-type') implementation project(':code:libraries:language-processing') implementation project(':third-party:parquet-floor') diff --git a/code/processes/converting-process/build.gradle b/code/processes/converting-process/build.gradle index 8c6b616e..77b0b025 100644 --- a/code/processes/converting-process/build.gradle +++ b/code/processes/converting-process/build.gradle @@ -35,7 +35,6 @@ dependencies { implementation project(':code:common:config') implementation project(':code:libraries:message-queue') implementation project(':code:libraries:blocking-thread-pool') - implementation project(':code:common:service-discovery') implementation project(':code:libraries:guarded-regex') implementation project(':code:libraries:easy-lsh') diff --git a/code/processes/crawling-process/build.gradle b/code/processes/crawling-process/build.gradle index 94a60b11..0bc95a4c 100644 --- a/code/processes/crawling-process/build.gradle +++ b/code/processes/crawling-process/build.gradle @@ -31,7 +31,6 @@ dependencies { implementation project(':code:libraries:blocking-thread-pool') implementation project(':code:index:api') implementation project(':code:process-mqapi') - implementation project(':code:common:service-discovery') implementation project(':code:libraries:message-queue') implementation project(':code:libraries:language-processing') implementation project(':code:libraries:easy-lsh') diff --git a/code/processes/loading-process/build.gradle b/code/processes/loading-process/build.gradle index 4a2afc68..b44c27fa 100644 --- a/code/processes/loading-process/build.gradle +++ b/code/processes/loading-process/build.gradle @@ -27,7 +27,6 @@ dependencies { implementation project(':code:common:db') implementation project(':code:common:config') implementation project(':code:common:service') - implementation project(':code:common:service-discovery') implementation project(':code:common:linkdb') implementation project(':code:index:index-journal') implementation project(':code:libraries:message-queue') diff --git a/code/processes/website-adjacencies-calculator/build.gradle b/code/processes/website-adjacencies-calculator/build.gradle index 6019d1dd..a6e63d1f 100644 --- a/code/processes/website-adjacencies-calculator/build.gradle +++ b/code/processes/website-adjacencies-calculator/build.gradle @@ -23,7 +23,6 @@ dependencies { implementation project(':code:common:model') implementation project(':code:common:db') implementation project(':code:common:process') - implementation project(':code:common:service-discovery') implementation project(':code:common:service') implementation project(':code:functions:link-graph:api') diff --git a/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java b/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java index 92aae06a..ba270c4e 100644 --- a/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java +++ b/code/processes/website-adjacencies-calculator/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java @@ -10,7 +10,7 @@ import nu.marginalia.model.EdgeDomain; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeatImpl; import nu.marginalia.service.ProcessMainClass; -import nu.marginalia.service.ServiceDiscoveryModule; +import nu.marginalia.service.module.ServiceDiscoveryModule; import nu.marginalia.service.module.DatabaseModule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/code/services-application/api-service/build.gradle b/code/services-application/api-service/build.gradle index ff27b83e..726f703d 100644 --- a/code/services-application/api-service/build.gradle +++ b/code/services-application/api-service/build.gradle @@ -27,7 +27,6 @@ dependencies { implementation project(':code:common:model') implementation project(':code:common:service') implementation project(':code:common:config') - implementation project(':code:common:service-discovery') implementation project(':code:functions:search-query:api') implementation project(':code:index:query') diff --git a/code/services-application/api-service/java/nu/marginalia/api/ApiMain.java b/code/services-application/api-service/java/nu/marginalia/api/ApiMain.java index cca846d4..3eef045a 100644 --- a/code/services-application/api-service/java/nu/marginalia/api/ApiMain.java +++ b/code/services-application/api-service/java/nu/marginalia/api/ApiMain.java @@ -4,8 +4,8 @@ import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import nu.marginalia.service.MainClass; -import nu.marginalia.service.ServiceDiscoveryModule; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.module.ServiceDiscoveryModule; +import nu.marginalia.service.ServiceId; import nu.marginalia.service.module.ServiceConfigurationModule; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.server.Initialization; diff --git a/code/services-application/dating-service/build.gradle b/code/services-application/dating-service/build.gradle index 72a3057a..2435383c 100644 --- a/code/services-application/dating-service/build.gradle +++ b/code/services-application/dating-service/build.gradle @@ -27,7 +27,6 @@ dependencies { implementation project(':code:common:db') implementation project(':code:common:model') implementation project(':code:common:service') - implementation project(':code:common:service-discovery') implementation project(':code:common:renderer') implementation project(':code:features-search:screenshots') implementation project(':code:features-search:random-websites') diff --git a/code/services-application/dating-service/java/nu/marginalia/dating/DatingMain.java b/code/services-application/dating-service/java/nu/marginalia/dating/DatingMain.java index d5cf6f4b..cf85016b 100644 --- a/code/services-application/dating-service/java/nu/marginalia/dating/DatingMain.java +++ b/code/services-application/dating-service/java/nu/marginalia/dating/DatingMain.java @@ -4,8 +4,8 @@ import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import nu.marginalia.service.MainClass; -import nu.marginalia.service.ServiceDiscoveryModule; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.module.ServiceDiscoveryModule; +import nu.marginalia.service.ServiceId; import nu.marginalia.service.module.ServiceConfigurationModule; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.server.Initialization; diff --git a/code/services-application/explorer-service/build.gradle b/code/services-application/explorer-service/build.gradle index 6d2e56ac..0cac3ad4 100644 --- a/code/services-application/explorer-service/build.gradle +++ b/code/services-application/explorer-service/build.gradle @@ -26,7 +26,6 @@ apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { implementation project(':code:common:model') implementation project(':code:common:service') - implementation project(':code:common:service-discovery') implementation project(':code:common:renderer') implementation project(':code:features-search:random-websites') diff --git a/code/services-application/explorer-service/java/nu/marginalia/explorer/ExplorerMain.java b/code/services-application/explorer-service/java/nu/marginalia/explorer/ExplorerMain.java index e48320f7..5a1fd734 100644 --- a/code/services-application/explorer-service/java/nu/marginalia/explorer/ExplorerMain.java +++ b/code/services-application/explorer-service/java/nu/marginalia/explorer/ExplorerMain.java @@ -4,8 +4,8 @@ import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import nu.marginalia.service.MainClass; -import nu.marginalia.service.ServiceDiscoveryModule; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.module.ServiceDiscoveryModule; +import nu.marginalia.service.ServiceId; import nu.marginalia.service.module.ServiceConfigurationModule; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.server.Initialization; diff --git a/code/services-application/search-service/build.gradle b/code/services-application/search-service/build.gradle index d5285334..561073d0 100644 --- a/code/services-application/search-service/build.gradle +++ b/code/services-application/search-service/build.gradle @@ -48,7 +48,6 @@ dependencies { implementation project(':code:index:api') - implementation project(':code:common:service-discovery') implementation project(':code:common:renderer') implementation project(':code:features-search:screenshots') diff --git a/code/services-application/search-service/java/nu/marginalia/search/SearchMain.java b/code/services-application/search-service/java/nu/marginalia/search/SearchMain.java index f799759c..01350592 100644 --- a/code/services-application/search-service/java/nu/marginalia/search/SearchMain.java +++ b/code/services-application/search-service/java/nu/marginalia/search/SearchMain.java @@ -4,8 +4,8 @@ import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import nu.marginalia.service.MainClass; -import nu.marginalia.service.ServiceDiscoveryModule; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.module.ServiceDiscoveryModule; +import nu.marginalia.service.ServiceId; import nu.marginalia.service.module.ServiceConfigurationModule; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.server.Initialization; diff --git a/code/services-core/assistant-service/build.gradle b/code/services-core/assistant-service/build.gradle index 278bbcdf..f1a25b0f 100644 --- a/code/services-core/assistant-service/build.gradle +++ b/code/services-core/assistant-service/build.gradle @@ -34,7 +34,6 @@ dependencies { implementation project(':code:common:service') implementation project(':code:common:model') implementation project(':code:common:db') - implementation project(':code:common:service-discovery') implementation project(':code:features-search:screenshots') diff --git a/code/services-core/assistant-service/java/nu/marginalia/assistant/AssistantMain.java b/code/services-core/assistant-service/java/nu/marginalia/assistant/AssistantMain.java index f8c2cc3b..1f583587 100644 --- a/code/services-core/assistant-service/java/nu/marginalia/assistant/AssistantMain.java +++ b/code/services-core/assistant-service/java/nu/marginalia/assistant/AssistantMain.java @@ -4,8 +4,8 @@ import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import nu.marginalia.service.MainClass; -import nu.marginalia.service.ServiceDiscoveryModule; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.module.ServiceDiscoveryModule; +import nu.marginalia.service.ServiceId; import nu.marginalia.service.module.ServiceConfigurationModule; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.server.Initialization; diff --git a/code/services-core/control-service/build.gradle b/code/services-core/control-service/build.gradle index a14aec08..2f34648f 100644 --- a/code/services-core/control-service/build.gradle +++ b/code/services-core/control-service/build.gradle @@ -32,7 +32,6 @@ dependencies { implementation project(':code:common:config') implementation project(':code:common:renderer') implementation project(':code:libraries:message-queue') - implementation project(':code:common:service-discovery') implementation project(':code:functions:search-query:api') implementation project(':code:execution:api') implementation project(':code:index:api') diff --git a/code/services-core/control-service/java/nu/marginalia/control/ControlMain.java b/code/services-core/control-service/java/nu/marginalia/control/ControlMain.java index a8bbb4f4..6946f8d6 100644 --- a/code/services-core/control-service/java/nu/marginalia/control/ControlMain.java +++ b/code/services-core/control-service/java/nu/marginalia/control/ControlMain.java @@ -4,8 +4,8 @@ import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import nu.marginalia.service.MainClass; -import nu.marginalia.service.ServiceDiscoveryModule; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.module.ServiceDiscoveryModule; +import nu.marginalia.service.ServiceId; import nu.marginalia.service.module.ServiceConfigurationModule; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.server.Initialization; diff --git a/code/services-core/control-service/java/nu/marginalia/control/actor/rebalance/RebalanceActor.java b/code/services-core/control-service/java/nu/marginalia/control/actor/rebalance/RebalanceActor.java deleted file mode 100644 index 5facda16..00000000 --- a/code/services-core/control-service/java/nu/marginalia/control/actor/rebalance/RebalanceActor.java +++ /dev/null @@ -1,165 +0,0 @@ -package nu.marginalia.control.actor.rebalance; - -public class RebalanceActor { - /** - // States - - public static final String INIT = "INIT"; - public static final String END = "END"; - - private static final Logger logger = LoggerFactory.getLogger(RebalanceActor.class); - - private final NodeConfigurationService nodeConfigurationService; - private final MqPersistence mqPersistence; - private final HikariDataSource dataSource; - private final Gson gson = GsonFactory.get(); - @Override - public String describe() { - return "Rebalances crawl data among the nodes"; - } - - @Inject - public RebalanceActor(ActorStateFactory stateFactory, - NodeConfigurationService nodeConfigurationService, - MqPersistence mqPersistence, HikariDataSource dataSource) - { - super(stateFactory); - this.nodeConfigurationService = nodeConfigurationService; - this.mqPersistence = mqPersistence; - this.dataSource = dataSource; - } - - @ActorState(name= INIT, next = END, resume = ActorResumeBehavior.ERROR, - description = "Rebalance!") - public void doIt() throws Exception { - var populations = getNodePopulations(); - - if (populations.size() <= 1) { - transition(END); - } - - int average = (int) populations.stream().mapToInt(pop -> pop.count).average().orElse(0); - int tolerance = average / 10; - - PriorityQueue surplusList = new PriorityQueue<>(); - PriorityQueue deficitList = new PriorityQueue<>(); - - populations.forEach(pop -> { - int delta = pop.count - average; - if (delta - tolerance > 0) { - surplusList.add(new Sur(pop.node, delta)); - } - else if (delta + tolerance < 0) { - deficitList.add(new Def(pop.node, -delta)); - } - }); - - List actions = new ArrayList<>(); - - while (!surplusList.isEmpty() && !deficitList.isEmpty()) { - var sur = surplusList.poll(); - var def = deficitList.poll(); - - assert (sur.n != def.n); - - int amount = Math.min(sur.c, def.c); - actions.add(new Give(sur.n, def.n, amount)); - - if (sur.c - amount > tolerance) { - surplusList.add(new Sur(sur.n, sur.c - amount)); - } - if (def.c - amount > tolerance) { - deficitList.add(new Def(def.n, def.c - amount)); - } - } - - for (var action : actions) { - var outbox = new MqOutbox(mqPersistence, "executor-service", action.dest, getClass().getSimpleName(), 0, UUID.randomUUID()); - var msg = outbox.send("TRANSFER-DOMAINS", - gson.toJson(Map.of("sourceNode", action.donor, "count", action.c))); - if (msg.state() != MqMessageState.OK) { - logger.error("ERROR! {}", msg); - } - outbox.stop(); - } - - } - - private List getNodePopulations() throws SQLException { - Map ret = new HashMap<>(); - - try (var conn = dataSource.getConnection(); - var query = conn.prepareStatement(""" - SELECT NODE_AFFINITY, COUNT(*) - FROM EC_DOMAIN - WHERE NODE_AFFINITY > 0 - GROUP BY NODE_AFFINITY - """)) { - var rs = query.executeQuery(); - while (rs.next()) { - ret.put(rs.getInt(1), rs.getInt(2)); - } - } - - for (var node : nodeConfigurationService.getAll()) { - if (isNodeExcluded(node)) { - ret.remove(node.node()); - } else { - ret.putIfAbsent(node.node(), 0); - } - } - - return ret.entrySet().stream().map(e -> new Pop(e.getKey(), e.getValue())).toList(); - } - - private boolean isNodeExcluded(NodeConfiguration node) { - return node.disabled(); - } - - //* 1. calculate sizes for each node using db - // - //2. rebalance - // - //-- find average - //-- calculate surplus and deficit, with a NN% tolerance - //-- create instructions for redistribution - // - //3. instruct each executor to transfer data: - // - //-- transfer domain data - //-- append to receiver crawler log - //-- instruct donor to delete file - // - //4. regenerate crawler logs based on present files on all donor nodes * / - - public record Sur(int n, int c) implements Comparable { - @Override - public int compareTo(@NotNull RebalanceActor.Sur o) { - int d = Integer.compare(o.c, c); - if (d != 0) - return d; - - return Integer.compare(n, o.n); - } - } - public record Def(int n, int c) implements Comparable { - - @Override - public int compareTo(@NotNull RebalanceActor.Def o) { - int d = Integer.compare(o.c, c); - if (d != 0) - return d; - - return Integer.compare(n, o.n); - } - } - - public record Populations(List pops) { - } - public record Pop(int node, int count) { - - } - public record Give(int donor, int dest, int c) { - - } */ -} diff --git a/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeService.java b/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeService.java index 95a0ec40..a62bbad3 100644 --- a/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeService.java +++ b/code/services-core/control-service/java/nu/marginalia/control/node/svc/ControlNodeService.java @@ -15,7 +15,7 @@ import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.nodecfg.model.NodeConfiguration; import nu.marginalia.storage.FileStorageService; import nu.marginalia.executor.client.ExecutorClient; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.ServiceId; import nu.marginalia.storage.model.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/code/services-core/control-service/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java b/code/services-core/control-service/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java index 726491ec..738e2cd5 100644 --- a/code/services-core/control-service/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java +++ b/code/services-core/control-service/java/nu/marginalia/control/sys/svc/ControlSysActionsService.java @@ -12,7 +12,7 @@ import nu.marginalia.mq.MessageQueueFactory; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.nodecfg.NodeConfigurationService; import nu.marginalia.service.control.ServiceEventLog; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.ServiceId; import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.model.FileStorageType; import spark.Request; diff --git a/code/services-core/executor-service/build.gradle b/code/services-core/executor-service/build.gradle index 69ad7d1f..435e5ec6 100644 --- a/code/services-core/executor-service/build.gradle +++ b/code/services-core/executor-service/build.gradle @@ -40,7 +40,6 @@ dependencies { implementation project(':code:common:linkdb') implementation project(':code:common:service') - implementation project(':code:common:service-discovery') implementation project(':third-party:commons-codec') @@ -72,7 +71,6 @@ dependencies { implementation libs.notnull implementation libs.guice implementation libs.trove - implementation libs.protobuf implementation libs.zstd implementation libs.jsoup implementation libs.commons.io diff --git a/code/services-core/executor-service/java/nu/marginalia/executor/ExecutorMain.java b/code/services-core/executor-service/java/nu/marginalia/executor/ExecutorMain.java index 86798d86..1e524bca 100644 --- a/code/services-core/executor-service/java/nu/marginalia/executor/ExecutorMain.java +++ b/code/services-core/executor-service/java/nu/marginalia/executor/ExecutorMain.java @@ -4,8 +4,8 @@ import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import nu.marginalia.service.MainClass; -import nu.marginalia.service.ServiceDiscoveryModule; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.module.ServiceDiscoveryModule; +import nu.marginalia.service.ServiceId; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.module.ServiceConfigurationModule; import nu.marginalia.service.server.Initialization; diff --git a/code/services-core/index-service/build.gradle b/code/services-core/index-service/build.gradle index f29a1494..73913d45 100644 --- a/code/services-core/index-service/build.gradle +++ b/code/services-core/index-service/build.gradle @@ -26,7 +26,6 @@ apply from: "$rootProject.projectDir/srcsets.gradle" dependencies { implementation project(':code:common:config') implementation project(':code:common:service') - implementation project(':code:common:service-discovery') implementation project(':code:common:model') implementation project(':code:common:db') implementation project(':code:common:linkdb') @@ -37,8 +36,6 @@ dependencies { implementation project(':code:functions:search-query:api') implementation project(':code:index:api') - implementation project(':code:common:service-discovery') - testImplementation project(path: ':code:services-core:control-service') testImplementation project(':code:common:process') @@ -47,7 +44,6 @@ dependencies { implementation libs.prometheus implementation libs.notnull implementation libs.guice - implementation libs.protobuf implementation libs.bundles.httpcomponents implementation libs.roaringbitmap implementation libs.snakeyaml diff --git a/code/services-core/index-service/java/nu/marginalia/index/IndexMain.java b/code/services-core/index-service/java/nu/marginalia/index/IndexMain.java index 006f313e..a159ca63 100644 --- a/code/services-core/index-service/java/nu/marginalia/index/IndexMain.java +++ b/code/services-core/index-service/java/nu/marginalia/index/IndexMain.java @@ -4,8 +4,8 @@ import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import nu.marginalia.service.MainClass; -import nu.marginalia.service.ServiceDiscoveryModule; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.module.ServiceDiscoveryModule; +import nu.marginalia.service.ServiceId; import nu.marginalia.service.module.ServiceConfigurationModule; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.server.Initialization; diff --git a/code/services-core/query-service/build.gradle b/code/services-core/query-service/build.gradle index b4e2a179..6857e5e8 100644 --- a/code/services-core/query-service/build.gradle +++ b/code/services-core/query-service/build.gradle @@ -31,7 +31,6 @@ dependencies { implementation project(':code:common:service') implementation project(':code:common:renderer') implementation project(':code:index:api') - implementation project(':code:common:service-discovery') implementation project(':code:index:query') implementation project(':code:functions:search-query') @@ -46,7 +45,6 @@ dependencies { implementation libs.prometheus implementation libs.notnull implementation libs.guice - implementation libs.protobuf implementation libs.bundles.mariadb implementation libs.bundles.grpc diff --git a/code/services-core/query-service/java/nu/marginalia/query/QueryMain.java b/code/services-core/query-service/java/nu/marginalia/query/QueryMain.java index 4034efde..f08491cd 100644 --- a/code/services-core/query-service/java/nu/marginalia/query/QueryMain.java +++ b/code/services-core/query-service/java/nu/marginalia/query/QueryMain.java @@ -4,8 +4,8 @@ import com.google.inject.Guice; import com.google.inject.Inject; import com.google.inject.Injector; import nu.marginalia.service.MainClass; -import nu.marginalia.service.ServiceDiscoveryModule; -import nu.marginalia.service.id.ServiceId; +import nu.marginalia.service.module.ServiceDiscoveryModule; +import nu.marginalia.service.ServiceId; import nu.marginalia.service.module.ServiceConfigurationModule; import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.server.Initialization; diff --git a/code/tools/experiment-runner/build.gradle b/code/tools/experiment-runner/build.gradle index 36001e0b..1923d4dd 100644 --- a/code/tools/experiment-runner/build.gradle +++ b/code/tools/experiment-runner/build.gradle @@ -30,7 +30,6 @@ dependencies { implementation project(':code:common:config') implementation project(':code:common:process') implementation project(':code:common:service') - implementation project(':code:common:service-discovery') implementation project(':code:libraries:language-processing') implementation project(':code:libraries:term-frequency-dict') implementation project(':code:libraries:big-string') diff --git a/settings.gradle b/settings.gradle index fca2091c..06dc71e2 100644 --- a/settings.gradle +++ b/settings.gradle @@ -70,7 +70,6 @@ include 'code:features-crawl:content-type' include 'code:process-mqapi' -include 'code:common:service-discovery' include 'code:common:db' include 'code:common:linkdb' include 'code:common:service' @@ -224,6 +223,7 @@ dependencyResolutionManagement { bundle('selenium', ['selenium.chrome', 'selenium.java']) bundle('handlebars', ['handlebars', 'handlebars.markdown']) bundle('grpc', ['protobuf', 'grpc-stub', 'grpc-protobuf', 'grpc-netty']) + bundle('protobuf', ['protobuf', 'javax.annotation']) bundle('gson', ['gson', 'gson-type-adapter']) bundle('httpcomponents', ['httpcomponents.core', 'httpcomponents.client']) bundle('parquet', ['parquet-column', 'parquet-hadoop'])