From 61288c5e687abb1cc10b42de432b890ab5115b6f Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Mon, 9 Oct 2023 22:13:22 +0200 Subject: [PATCH] (service, client) First steps towards multiple nodedness --- .../assistant/client/AssistantClient.java | 8 +- .../marginalia/index/client/IndexClient.java | 11 +- .../marginalia/query/client/QueryClient.java | 5 +- .../nu/marginalia/client/AbstractClient.java | 198 ++++++++++-------- .../client/AbstractDynamicClient.java | 15 +- .../marginalia/client/model/ClientRoute.java | 4 + .../marginalia/client/AbstractClientTest.java | 25 +-- .../descriptor/ServiceDescriptors.java | 2 + .../service/module/ConfigurationModule.java | 44 +++- .../search/svc/SearchErrorPageService.java | 42 +--- .../nu/marginalia/query/QueryService.java | 2 +- 11 files changed, 192 insertions(+), 164 deletions(-) create mode 100644 code/common/service-client/src/main/java/nu/marginalia/client/model/ClientRoute.java diff --git a/code/api/assistant-api/src/main/java/nu/marginalia/assistant/client/AssistantClient.java b/code/api/assistant-api/src/main/java/nu/marginalia/assistant/client/AssistantClient.java index 94677317..8916120c 100644 --- a/code/api/assistant-api/src/main/java/nu/marginalia/assistant/client/AssistantClient.java +++ b/code/api/assistant-api/src/main/java/nu/marginalia/assistant/client/AssistantClient.java @@ -26,7 +26,7 @@ public class AssistantClient extends AbstractDynamicClient { public Observable dictionaryLookup(Context ctx, String word) { try { - return super.get(ctx, "/dictionary/" + URLEncoder.encode(word, StandardCharsets.UTF_8), DictionaryResponse.class); + return super.get(ctx, 0, "/dictionary/" + URLEncoder.encode(word, StandardCharsets.UTF_8), DictionaryResponse.class); } catch (RouteNotConfiguredException ex) { return Observable.empty(); @@ -36,7 +36,7 @@ public class AssistantClient extends AbstractDynamicClient { @SuppressWarnings("unchecked") public Observable> spellCheck(Context ctx, String word) { try { - return (Observable>) (Object) super.get(ctx, "/spell-check/" + URLEncoder.encode(word, StandardCharsets.UTF_8), List.class); + return (Observable>) (Object) super.get(ctx, 0, "/spell-check/" + URLEncoder.encode(word, StandardCharsets.UTF_8), List.class); } catch (RouteNotConfiguredException ex) { return Observable.empty(); @@ -44,7 +44,7 @@ public class AssistantClient extends AbstractDynamicClient { } public Observable unitConversion(Context ctx, String value, String from, String to) { try { - return super.get(ctx, "/unit-conversion?value=" + value + "&from=" + from + "&to=" + to); + return super.get(ctx, 0, "/unit-conversion?value=" + value + "&from=" + from + "&to=" + to); } catch (RouteNotConfiguredException ex) { return Observable.empty(); @@ -53,7 +53,7 @@ public class AssistantClient extends AbstractDynamicClient { public Observable evalMath(Context ctx, String expression) { try { - return super.get(ctx, "/eval-expression?value=" + URLEncoder.encode(expression, StandardCharsets.UTF_8)); + return super.get(ctx, 0, "/eval-expression?value=" + URLEncoder.encode(expression, StandardCharsets.UTF_8)); } catch (RouteNotConfiguredException ex) { return Observable.empty(); diff --git a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java index db60948d..d9d136e8 100644 --- a/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java +++ b/code/api/index-api/src/main/java/nu/marginalia/index/client/IndexClient.java @@ -27,7 +27,8 @@ public class IndexClient extends AbstractDynamicClient { @Inject public IndexClient(ServiceDescriptors descriptors, - MessageQueueFactory messageQueueFactory) { + MessageQueueFactory messageQueueFactory) + { super(descriptors.forId(ServiceId.Index), WmsaHome.getHostsFile(), GsonFactory::get); String inboxName = ServiceId.Index.name + ":" + "0"; @@ -44,16 +45,16 @@ public class IndexClient extends AbstractDynamicClient { } @CheckReturnValue - public SearchResultSet query(Context ctx, SearchSpecification specs) { + public SearchResultSet query(Context ctx, int node, SearchSpecification specs) { return wmsa_search_index_api_time.time( - () -> this.postGet(ctx, "/search/", specs, SearchResultSet.class).blockingFirst() + () -> this.postGet(ctx, node,"/search/", specs, SearchResultSet.class).blockingFirst() ); } @CheckReturnValue - public Observable isBlocked(Context ctx) { - return super.get(ctx, "/is-blocked", Boolean.class); + public Observable isBlocked(Context ctx, int node) { + return super.get(ctx, node, "/is-blocked", Boolean.class); } } diff --git a/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java b/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java index a995ba0e..742c8200 100644 --- a/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java +++ b/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java @@ -3,7 +3,6 @@ package nu.marginalia.query.client; import com.google.inject.Inject; import com.google.inject.Singleton; import io.prometheus.client.Summary; -import io.reactivex.rxjava3.core.Observable; import nu.marginalia.WmsaHome; import nu.marginalia.client.AbstractDynamicClient; import nu.marginalia.client.Context; @@ -49,13 +48,13 @@ public class QueryClient extends AbstractDynamicClient { @CheckReturnValue public SearchResultSet delegate(Context ctx, SearchSpecification specs) { return wmsa_search_index_api_delegate_time.time( - () -> this.postGet(ctx, "/delegate/", specs, SearchResultSet.class).blockingFirst() + () -> this.postGet(ctx, 0, "/delegate/", specs, SearchResultSet.class).blockingFirst() ); } @CheckReturnValue public QueryResponse search(Context ctx, QueryParams params) { return wmsa_search_index_api_search_time.time( - () -> this.postGet(ctx, "/search/", params, QueryResponse.class).blockingFirst() + () -> this.postGet(ctx, 0, "/search/", params, QueryResponse.class).blockingFirst() ); } public MqOutbox outbox() { diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java b/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java index 5be58520..20094ea9 100644 --- a/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java +++ b/code/common/service-client/src/main/java/nu/marginalia/client/AbstractClient.java @@ -10,21 +10,19 @@ import nu.marginalia.client.exception.LocalException; import nu.marginalia.client.exception.NetworkException; import nu.marginalia.client.exception.RemoteException; import nu.marginalia.client.exception.RouteNotConfiguredException; +import nu.marginalia.client.model.ClientRoute; import okhttp3.*; import org.apache.http.HttpHost; import org.apache.logging.log4j.ThreadContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.OutputStreamWriter; import java.net.ConnectException; -import java.util.Arrays; -import java.util.List; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; -import java.util.zip.GZIPOutputStream; public abstract class AbstractClient implements AutoCloseable { private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -34,19 +32,27 @@ public abstract class AbstractClient implements AutoCloseable { private final OkHttpClient client; private boolean quiet; - private String serviceRoute; + private final Map serviceRoutes; private int timeout; - - private volatile boolean alive; - private final Thread livenessMonitor; + private final LivenessMonitor livenessMonitor = new LivenessMonitor(); + private final Thread livenessMonitorThread; public void setTimeout(int timeout) { this.timeout = timeout; } - public AbstractClient(String host, int port, int timeout, Supplier gsonProvider) { - logger.info("Creating client for {}[{}:{}]", getClass().getSimpleName(), host, port); + public AbstractClient(ClientRoute route, int timeout, Supplier gsonProvider) { + this(Map.of(0, route), timeout, gsonProvider); + } + + public AbstractClient(Map routes, + int timeout, + Supplier gsonProvider) + { + routes.forEach((node, route) -> { + logger.info("Creating client route for {}:{} -> {}:{}", getClass().getSimpleName(), node, route.host(), route.port()); + }); this.gson = gsonProvider.get(); @@ -57,7 +63,12 @@ public abstract class AbstractClient implements AutoCloseable { .retryOnConnectionFailure(true) .followRedirects(true) .build(); - serviceRoute = new HttpHost(host, port).toURI(); + + serviceRoutes = new HashMap<>(routes.size()); + + routes.forEach((node, client) -> + serviceRoutes.put(node, new HttpHost(client.host(), client.port()).toURI()) + ); RxJavaPlugins.setErrorHandler(e -> { if (e.getMessage() == null) { @@ -67,52 +78,71 @@ public abstract class AbstractClient implements AutoCloseable { logger.error("Error {}: {}", e.getClass().getSimpleName(), e.getMessage()); } }); - livenessMonitor = new Thread(this::monitorLiveness, host + "-monitor"); - livenessMonitor.setDaemon(true); - livenessMonitor.start(); + + livenessMonitorThread = new Thread(livenessMonitor, getClass().getSimpleName() + "-monitor"); + livenessMonitorThread.setDaemon(true); + livenessMonitorThread.start(); logger.info("Finished creating client for {}", getClass().getSimpleName()); } - public void setServiceRoute(String hostname, int port) { - scheduler().abort(); - serviceRoute = new HttpHost(hostname, port).toURI(); - } + private class LivenessMonitor implements Runnable { + private final ConcurrentHashMap alivenessMap = new ConcurrentHashMap<>(); - protected String getServiceRoute() { - return serviceRoute; - } - - @SneakyThrows - private void monitorLiveness() { - Thread.sleep(100); // Wait for initialization - try { - for (; ; ) { - try { - alive = isResponsive(); - } - // - catch (Exception ex) { - logger.warn("Oops", ex); - } - synchronized (livenessMonitor) { - if (alive) { - livenessMonitor.wait(1000); + @SneakyThrows + public void run() { + Thread.sleep(100); // Wait for initialization + try { + for (; ; ) { + boolean allAlive = true; + try { + for (int node : serviceRoutes.keySet()) { + boolean isResponsive = isResponsive(node); + alivenessMap.put(node, isResponsive); + allAlive &= !isResponsive; + } + } + // + catch (Exception ex) { + logger.warn("Oops", ex); + } + if (allAlive) { + synchronized (this) { + wait(1000); + } + } + else { + Thread.sleep(100); } } - if (!alive) { - Thread.sleep(100); - } + } catch (InterruptedException ex) { + // nothing to see here } } - catch (InterruptedException ex) { - // nothing to see here + + public boolean isAlive(int node) { + return alivenessMap.getOrDefault(node, false); + } + + public synchronized boolean isResponsive(int node) { + Context ctx = Context.internal("ping"); + var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + "/internal/ping").get().build(); + + return Observable.just(client.newCall(req)) + .subscribeOn(scheduler().get()) + .map(Call::execute) + .map(AbstractClient.this::getResponseStatus) + .flatMap(line -> validateStatus(line, req).timeout(5000, TimeUnit.SECONDS).onErrorReturn(e -> 500)) + .onErrorReturn(error -> 500) + .map(HttpStatusCode::new) + .map(HttpStatusCode::isGood) + .blockingFirst(); } } @Override public void close() { - livenessMonitor.interrupt(); + livenessMonitorThread.interrupt(); scheduler().close(); } @@ -124,25 +154,11 @@ public abstract class AbstractClient implements AutoCloseable { public abstract String name(); - public synchronized boolean isResponsive() { - Context ctx = Context.internal("ping"); - var req = ctx.paint(new Request.Builder()).url(serviceRoute + "/internal/ping").get().build(); - - return Observable.just(client.newCall(req)) - .subscribeOn(scheduler().get()) - .map(Call::execute) - .map(this::getResponseStatus) - .flatMap(line -> validateStatus(line, req).timeout(5000, TimeUnit.SECONDS).onErrorReturn(e -> 500)) - .onErrorReturn(error -> 500) - .map(HttpStatusCode::new) - .map(HttpStatusCode::isGood) - .blockingFirst(); - } public synchronized boolean isAccepting() { Context ctx = Context.internal("ready"); - var req = ctx.paint(new Request.Builder()).url(serviceRoute + "/internal/ready").get().build(); + var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(0) + "/internal/ready").get().build(); return Observable.just(client.newCall(req)) .subscribeOn(scheduler().get()) @@ -157,13 +173,16 @@ public abstract class AbstractClient implements AutoCloseable { } @SneakyThrows - protected synchronized Observable post(Context ctx, String endpoint, Object data) { + protected synchronized Observable post(Context ctx, + int node, + String endpoint, + Object data) { - ensureAlive(); + ensureAlive(node); RequestBody body = RequestBody.create(json(data), MediaType.parse("application/json; charset=utf-8")); - var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).post(body).build(); + var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).post(body).build(); return Observable .just(client.newCall(req)) @@ -180,17 +199,17 @@ public abstract class AbstractClient implements AutoCloseable { } @SneakyThrows - protected synchronized Observable post(Context ctx, String endpoint, GeneratedMessageV3 data) { + protected synchronized Observable post(Context ctx, int node, String endpoint, GeneratedMessageV3 data) { - ensureAlive(); + ensureAlive(0); RequestBody body = RequestBody.create(data.toByteArray(), MediaType.parse("application/protobuf")); - var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).post(body).build(); + var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).post(body).build(); var call = client.newCall(req); logInbound(call); - ThreadContext.put("outbound-request", serviceRoute + endpoint); + ThreadContext.put("outbound-request", serviceRoutes.get(node) + endpoint); try (var rsp = call.execute()) { logOutbound(rsp); int code = rsp.code(); @@ -204,12 +223,12 @@ public abstract class AbstractClient implements AutoCloseable { @SneakyThrows - protected synchronized Observable postGet(Context ctx, String endpoint, Object data, Class returnType) { + protected synchronized Observable postGet(Context ctx, int node, String endpoint, Object data, Class returnType) { - ensureAlive(); + ensureAlive(0); RequestBody body = RequestBody.create(json(data), MediaType.parse("application/json")); - var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).post(body).build(); + var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).post(body).build(); return Observable.just(client.newCall(req)) .subscribeOn(scheduler().get()) @@ -223,18 +242,18 @@ public abstract class AbstractClient implements AutoCloseable { .doFinally(() -> ThreadContext.remove("outbound-request")); } - protected synchronized Observable post(Context ctx, String endpoint, String data, MediaType mediaType) { - ensureAlive(); + protected synchronized Observable post(Context ctx, int node, String endpoint, String data, MediaType mediaType) { + ensureAlive(0); var body = RequestBody.create(data, mediaType); - var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).post(body).build(); + var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).post(body).build(); var call = client.newCall(req); return Observable.just(call) .map((c) -> { - ThreadContext.put(CONTEXT_OUTBOUND_REQUEST, serviceRoute + endpoint); + ThreadContext.put(CONTEXT_OUTBOUND_REQUEST, serviceRoutes.get(node) + endpoint); return c; }) .subscribeOn(scheduler().get()) @@ -249,10 +268,10 @@ public abstract class AbstractClient implements AutoCloseable { .doFinally(() -> ThreadContext.remove("outbound-request")); } - protected synchronized Observable get(Context ctx, String endpoint, Class type) { - ensureAlive(); + protected synchronized Observable get(Context ctx, int node, String endpoint, Class type) { + ensureAlive(0); - var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).get().build(); + var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).get().build(); return Observable.just(client.newCall(req)) .subscribeOn(scheduler().get()) @@ -267,10 +286,10 @@ public abstract class AbstractClient implements AutoCloseable { } @SuppressWarnings("unchecked") - protected synchronized Observable get(Context ctx, String endpoint) { - ensureAlive(); + protected synchronized Observable get(Context ctx, int node, String endpoint) { + ensureAlive(0); - var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).get().build(); + var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).get().build(); return Observable.just(client.newCall(req)) .subscribeOn(scheduler().get()) @@ -284,10 +303,10 @@ public abstract class AbstractClient implements AutoCloseable { .doFinally(() -> ThreadContext.remove("outbound-request")); } - protected synchronized Observable delete(Context ctx, String endpoint) { - ensureAlive(); + protected synchronized Observable delete(Context ctx, int node, String endpoint) { + ensureAlive(0); - var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).delete().build(); + var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).delete().build(); return Observable.just(client.newCall(req)) .subscribeOn(scheduler().get()) @@ -314,11 +333,11 @@ public abstract class AbstractClient implements AutoCloseable { } @SneakyThrows - private void ensureAlive() { - if (!isAlive()) { + private void ensureAlive(int node) { + if (!isAlive(node)) { wait(2000); - if (!isAlive()) { - throw new RouteNotConfiguredException("Route not configured for " + name() + " -- tried " + serviceRoute); + if (!isAlive(node)) { + throw new RouteNotConfiguredException("Route not configured for " + name() + " -- tried " + serviceRoutes.get(node)); } } } @@ -331,6 +350,7 @@ public abstract class AbstractClient implements AutoCloseable { private Observable filterRetryableExceptions(Throwable error) throws Throwable { synchronized (livenessMonitor) { + // Signal to the liveness monitor that we may have an outage livenessMonitor.notifyAll(); } @@ -402,8 +422,8 @@ public abstract class AbstractClient implements AutoCloseable { } - public boolean isAlive() { - return alive; + public boolean isAlive(int node) { + return livenessMonitor.isAlive(node); } private String json(Object o) { diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/AbstractDynamicClient.java b/code/common/service-client/src/main/java/nu/marginalia/client/AbstractDynamicClient.java index f85c2898..c3d390b6 100644 --- a/code/common/service-client/src/main/java/nu/marginalia/client/AbstractDynamicClient.java +++ b/code/common/service-client/src/main/java/nu/marginalia/client/AbstractDynamicClient.java @@ -3,6 +3,7 @@ package nu.marginalia.client; import com.google.gson.Gson; import io.reactivex.rxjava3.core.Observable; import lombok.SneakyThrows; +import nu.marginalia.client.model.ClientRoute; import nu.marginalia.service.descriptor.ServiceDescriptor; import nu.marginalia.service.descriptor.HostsFile; import org.slf4j.Logger; @@ -17,7 +18,11 @@ public class AbstractDynamicClient extends AbstractClient { private final AbortingScheduler scheduler; public AbstractDynamicClient(@Nonnull ServiceDescriptor service, HostsFile hosts, Supplier gsonProvider) { - super(hosts.getHost(service), service.port, 10, gsonProvider); + super( + new ClientRoute(hosts.getHost(service), service.port), + 10, + gsonProvider + ); this.service = service; this.scheduler = new AbortingScheduler(name()); @@ -32,14 +37,6 @@ public class AbstractDynamicClient extends AbstractClient { return service; } - @SneakyThrows - public void blockingWait() { - logger.info("Waiting for route to {} ({})", service, getServiceRoute()); - while (!isAlive()) { - Thread.sleep(1000); - } - } - @Override public AbortingScheduler scheduler() { return scheduler; diff --git a/code/common/service-client/src/main/java/nu/marginalia/client/model/ClientRoute.java b/code/common/service-client/src/main/java/nu/marginalia/client/model/ClientRoute.java new file mode 100644 index 00000000..84d6abff --- /dev/null +++ b/code/common/service-client/src/main/java/nu/marginalia/client/model/ClientRoute.java @@ -0,0 +1,4 @@ +package nu.marginalia.client.model; + +public record ClientRoute(String host, int port) { +} diff --git a/code/common/service-client/src/test/java/nu/marginalia/client/AbstractClientTest.java b/code/common/service-client/src/test/java/nu/marginalia/client/AbstractClientTest.java index db590adf..27c3bdec 100644 --- a/code/common/service-client/src/test/java/nu/marginalia/client/AbstractClientTest.java +++ b/code/common/service-client/src/test/java/nu/marginalia/client/AbstractClientTest.java @@ -5,6 +5,7 @@ import io.reactivex.rxjava3.core.Observable; import lombok.AllArgsConstructor; import lombok.Data; import lombok.SneakyThrows; +import nu.marginalia.client.model.ClientRoute; import org.junit.jupiter.api.*; import spark.Request; import spark.Response; @@ -33,7 +34,7 @@ public class AbstractClientTest { int port = new Random().nextInt(6000, 10000); testServer = new TestServer(port); - client = new AbstractClient("localhost", port, 1, Gson::new) { + client = new AbstractClient(new ClientRoute("localhost", port), 1, Gson::new) { @Override public AbortingScheduler scheduler() { return new AbortingScheduler(name()); @@ -79,47 +80,47 @@ public class AbstractClientTest { public void testGetTimeout() { testServer.get(this::timeout); - assertError(client.get(Context.internal(), "/get")); + assertError(client.get(Context.internal(), 0, "/get")); } @Test public void testPostTimeout() { testServer.post(this::timeout); - assertError(client.post(Context.internal(), "/post", "test")); + assertError(client.post(Context.internal(), 0, "/post", "test")); } @Test public void testDeleteTimeout() { testServer.delete(this::timeout); - assertError(client.delete(Context.internal(), "/post")); + assertError(client.delete(Context.internal(), 0,"/post")); } @Test public void testPost404() { testServer.post(this::error404); - assertError(client.post(Context.internal(), "/post", "test")); + assertError(client.post(Context.internal(), 0,"/post", "test")); } @Test public void testGet404() { testServer.get(this::error404); - assertError(client.get(Context.internal(), "/get")); + assertError(client.get(Context.internal(), 0,"/get")); } @Test public void testDelete404() { testServer.delete(this::error404); - assertError(client.delete(Context.internal(), "/delete")); + assertError(client.delete(Context.internal(),0, "/delete")); } @Test public void testGet() { testServer.get((req, rsp) -> "Hello World"); - assertEquals("Hello World", client.get(Context.internal(), "/get").blockingFirst()); + assertEquals("Hello World", client.get(Context.internal(), 0,"/get").blockingFirst()); } @Test @@ -138,7 +139,7 @@ public class AbstractClientTest { public void testGetJson() { testServer.get((req, rsp) -> new DummyObject(5, "23"), new Gson()::toJson); - assertEquals(client.get(Context.internal(), "/get", DummyObject.class).blockingFirst(), + assertEquals(client.get(Context.internal(), 0,"/get", DummyObject.class).blockingFirst(), new DummyObject(5, "23")); } @@ -147,7 +148,7 @@ public class AbstractClientTest { public void testDelete() { testServer.delete((req, rsp) -> "Hello World"); - assertTrue(client.delete(Context.internal(), "/delete").blockingFirst().isGood()); + assertTrue(client.delete(Context.internal(), 0,"/delete").blockingFirst().isGood()); } @@ -159,7 +160,7 @@ public class AbstractClientTest { return "ok"; }); - client.post(Context.internal(), "/post", new DummyObject(5, "23")).blockingSubscribe(); + client.post(Context.internal(),0, "/post", new DummyObject(5, "23")).blockingSubscribe(); assertEquals(1, inbox.size()); assertEquals(new DummyObject(5, "23"), inbox.get(0)); } @@ -172,7 +173,7 @@ public class AbstractClientTest { return new DummyObject(1, "ret"); }, gson::toJson); - var ret = client.postGet(Context.internal(), "/post", new DummyObject(5, "23"), DummyObject.class).blockingFirst(); + var ret = client.postGet(Context.internal(), 0,"/post", new DummyObject(5, "23"), DummyObject.class).blockingFirst(); assertEquals(1, inbox.size()); assertEquals(new DummyObject(5, "23"), inbox.get(0)); assertEquals(new DummyObject(1, "ret"), ret); diff --git a/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptors.java b/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptors.java index 7fdbce0d..88f2a693 100644 --- a/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptors.java +++ b/code/common/service-discovery/src/main/java/nu/marginalia/service/descriptor/ServiceDescriptors.java @@ -1,5 +1,6 @@ package nu.marginalia.service.descriptor; +import nu.marginalia.service.SearchServiceDescriptors; import nu.marginalia.service.id.ServiceId; import java.util.LinkedHashMap; @@ -7,6 +8,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; +/** @see SearchServiceDescriptors */ public class ServiceDescriptors { private final Map descriptorsAll = new LinkedHashMap<>(); diff --git a/code/common/service/src/main/java/nu/marginalia/service/module/ConfigurationModule.java b/code/common/service/src/main/java/nu/marginalia/service/module/ConfigurationModule.java index 400c173b..b035dc52 100644 --- a/code/common/service/src/main/java/nu/marginalia/service/module/ConfigurationModule.java +++ b/code/common/service/src/main/java/nu/marginalia/service/module/ConfigurationModule.java @@ -1,9 +1,6 @@ package nu.marginalia.service.module; import com.google.inject.AbstractModule; -import com.google.inject.Provides; -import com.google.inject.name.Named; -import com.google.inject.name.Names; import nu.marginalia.service.descriptor.ServiceDescriptors; import nu.marginalia.service.id.ServiceId; @@ -22,12 +19,45 @@ public class ConfigurationModule extends AbstractModule { public void configure() { bind(ServiceDescriptors.class).toInstance(descriptors); - int basePort = descriptors.forId(id).port; - int prometheusPort = basePort + 1000; - String host = System.getProperty("service-host", "127.0.0.1"); - var configObject = new ServiceConfiguration(id, 0, host, basePort, prometheusPort, UUID.randomUUID()); + var configObject = new ServiceConfiguration(id, + getNode(), + getHost(), + getBasePort(), + getPrometheusPort(), + UUID.randomUUID() + ); bind(ServiceConfiguration.class).toInstance(configObject); } + private int getBasePort() { + String port = System.getenv("WMSA_SERVICE_PORT"); + + if (port != null) { + return Integer.parseInt(port); + } + + return descriptors.forId(id).port; + } + + private int getPrometheusPort() { + String prometheusPortEnv = System.getenv("WMSA_PROMETHEUS_PORT"); + + if (prometheusPortEnv != null) { + return Integer.parseInt(prometheusPortEnv); + } + + return descriptors.forId(id).port + 1000; + } + + private int getNode() { + String nodeEnv = Objects.requireNonNullElse(System.getenv("WMSA_SERVICE_NODE"), "0"); + + return Integer.parseInt(nodeEnv); + } + + private String getHost() { + return System.getProperty("service-host", "127.0.0.1"); + } + } diff --git a/code/services-application/search-service/src/main/java/nu/marginalia/search/svc/SearchErrorPageService.java b/code/services-application/search-service/src/main/java/nu/marginalia/search/svc/SearchErrorPageService.java index 03ae6297..68f55b4f 100644 --- a/code/services-application/search-service/src/main/java/nu/marginalia/search/svc/SearchErrorPageService.java +++ b/code/services-application/search-service/src/main/java/nu/marginalia/search/svc/SearchErrorPageService.java @@ -28,40 +28,14 @@ public class SearchErrorPageService { } public void serveError(Context ctx, Request request, Response rsp) { - boolean isIndexUp = indexClient.isAlive(); - - try { - if (!isIndexUp) { - rsp.body(renderError(request, "The index is down", - """ - The search index server appears to be down. -

- The server was possibly restarted to bring online some changes. - Restarting the index typically takes a few minutes, during which - searches can't be served. - """)); - } else if (indexClient.isBlocked(ctx).blockingFirst()) { - rsp.body(renderError(request, "The index is starting up", - """ - The search index server appears to be in the process of starting up. - This typically takes a few minutes. Be patient. - """)); - } - else { - rsp.body(renderError(request, "Error processing request", - """ - The search index appears to be up and running, so the problem may be related - to some wider general error, or pertain to an error handling your query. - """)); - } - } - catch (Exception ex) { - logger.warn("Error during rendering of error page", ex); - rsp.body(renderError(request, "Error processing error", - """ - An error has occurred, additionally, an error occurred while handling that error - """)); - } + rsp.body(renderError(request, "Internal error", + """ + An error occurred when communicating with the search engine index. +

+ This is hopefully a temporary state of affairs. It may be due to + an upgrade. The index typically takes a about two or three minutes + to reload from a cold restart. Thanks for your patience. + """)); } private String renderError(Request request, String title, String message) { diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java index 42e335eb..3b56b8f3 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java @@ -73,7 +73,7 @@ public class QueryService extends Service { } private SearchResultSet executeQuery(Context ctx, SearchSpecification query) { - return indexClient.query(ctx, query); + return indexClient.query(ctx, 0, query); } private boolean isBlacklisted(DecoratedSearchResultItem item) {