(service, client) First steps towards multiple nodedness
This commit is contained in:
parent
8375237de5
commit
61288c5e68
@ -26,7 +26,7 @@ public class AssistantClient extends AbstractDynamicClient {
|
|||||||
|
|
||||||
public Observable<DictionaryResponse> dictionaryLookup(Context ctx, String word) {
|
public Observable<DictionaryResponse> dictionaryLookup(Context ctx, String word) {
|
||||||
try {
|
try {
|
||||||
return super.get(ctx, "/dictionary/" + URLEncoder.encode(word, StandardCharsets.UTF_8), DictionaryResponse.class);
|
return super.get(ctx, 0, "/dictionary/" + URLEncoder.encode(word, StandardCharsets.UTF_8), DictionaryResponse.class);
|
||||||
}
|
}
|
||||||
catch (RouteNotConfiguredException ex) {
|
catch (RouteNotConfiguredException ex) {
|
||||||
return Observable.empty();
|
return Observable.empty();
|
||||||
@ -36,7 +36,7 @@ public class AssistantClient extends AbstractDynamicClient {
|
|||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public Observable<List<String>> spellCheck(Context ctx, String word) {
|
public Observable<List<String>> spellCheck(Context ctx, String word) {
|
||||||
try {
|
try {
|
||||||
return (Observable<List<String>>) (Object) super.get(ctx, "/spell-check/" + URLEncoder.encode(word, StandardCharsets.UTF_8), List.class);
|
return (Observable<List<String>>) (Object) super.get(ctx, 0, "/spell-check/" + URLEncoder.encode(word, StandardCharsets.UTF_8), List.class);
|
||||||
}
|
}
|
||||||
catch (RouteNotConfiguredException ex) {
|
catch (RouteNotConfiguredException ex) {
|
||||||
return Observable.empty();
|
return Observable.empty();
|
||||||
@ -44,7 +44,7 @@ public class AssistantClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
public Observable<String> unitConversion(Context ctx, String value, String from, String to) {
|
public Observable<String> unitConversion(Context ctx, String value, String from, String to) {
|
||||||
try {
|
try {
|
||||||
return super.get(ctx, "/unit-conversion?value=" + value + "&from=" + from + "&to=" + to);
|
return super.get(ctx, 0, "/unit-conversion?value=" + value + "&from=" + from + "&to=" + to);
|
||||||
}
|
}
|
||||||
catch (RouteNotConfiguredException ex) {
|
catch (RouteNotConfiguredException ex) {
|
||||||
return Observable.empty();
|
return Observable.empty();
|
||||||
@ -53,7 +53,7 @@ public class AssistantClient extends AbstractDynamicClient {
|
|||||||
|
|
||||||
public Observable<String> evalMath(Context ctx, String expression) {
|
public Observable<String> evalMath(Context ctx, String expression) {
|
||||||
try {
|
try {
|
||||||
return super.get(ctx, "/eval-expression?value=" + URLEncoder.encode(expression, StandardCharsets.UTF_8));
|
return super.get(ctx, 0, "/eval-expression?value=" + URLEncoder.encode(expression, StandardCharsets.UTF_8));
|
||||||
}
|
}
|
||||||
catch (RouteNotConfiguredException ex) {
|
catch (RouteNotConfiguredException ex) {
|
||||||
return Observable.empty();
|
return Observable.empty();
|
||||||
|
@ -27,7 +27,8 @@ public class IndexClient extends AbstractDynamicClient {
|
|||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public IndexClient(ServiceDescriptors descriptors,
|
public IndexClient(ServiceDescriptors descriptors,
|
||||||
MessageQueueFactory messageQueueFactory) {
|
MessageQueueFactory messageQueueFactory)
|
||||||
|
{
|
||||||
super(descriptors.forId(ServiceId.Index), WmsaHome.getHostsFile(), GsonFactory::get);
|
super(descriptors.forId(ServiceId.Index), WmsaHome.getHostsFile(), GsonFactory::get);
|
||||||
|
|
||||||
String inboxName = ServiceId.Index.name + ":" + "0";
|
String inboxName = ServiceId.Index.name + ":" + "0";
|
||||||
@ -44,16 +45,16 @@ public class IndexClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@CheckReturnValue
|
@CheckReturnValue
|
||||||
public SearchResultSet query(Context ctx, SearchSpecification specs) {
|
public SearchResultSet query(Context ctx, int node, SearchSpecification specs) {
|
||||||
return wmsa_search_index_api_time.time(
|
return wmsa_search_index_api_time.time(
|
||||||
() -> this.postGet(ctx, "/search/", specs, SearchResultSet.class).blockingFirst()
|
() -> this.postGet(ctx, node,"/search/", specs, SearchResultSet.class).blockingFirst()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@CheckReturnValue
|
@CheckReturnValue
|
||||||
public Observable<Boolean> isBlocked(Context ctx) {
|
public Observable<Boolean> isBlocked(Context ctx, int node) {
|
||||||
return super.get(ctx, "/is-blocked", Boolean.class);
|
return super.get(ctx, node, "/is-blocked", Boolean.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@ package nu.marginalia.query.client;
|
|||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import io.prometheus.client.Summary;
|
import io.prometheus.client.Summary;
|
||||||
import io.reactivex.rxjava3.core.Observable;
|
|
||||||
import nu.marginalia.WmsaHome;
|
import nu.marginalia.WmsaHome;
|
||||||
import nu.marginalia.client.AbstractDynamicClient;
|
import nu.marginalia.client.AbstractDynamicClient;
|
||||||
import nu.marginalia.client.Context;
|
import nu.marginalia.client.Context;
|
||||||
@ -49,13 +48,13 @@ public class QueryClient extends AbstractDynamicClient {
|
|||||||
@CheckReturnValue
|
@CheckReturnValue
|
||||||
public SearchResultSet delegate(Context ctx, SearchSpecification specs) {
|
public SearchResultSet delegate(Context ctx, SearchSpecification specs) {
|
||||||
return wmsa_search_index_api_delegate_time.time(
|
return wmsa_search_index_api_delegate_time.time(
|
||||||
() -> this.postGet(ctx, "/delegate/", specs, SearchResultSet.class).blockingFirst()
|
() -> this.postGet(ctx, 0, "/delegate/", specs, SearchResultSet.class).blockingFirst()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
@CheckReturnValue
|
@CheckReturnValue
|
||||||
public QueryResponse search(Context ctx, QueryParams params) {
|
public QueryResponse search(Context ctx, QueryParams params) {
|
||||||
return wmsa_search_index_api_search_time.time(
|
return wmsa_search_index_api_search_time.time(
|
||||||
() -> this.postGet(ctx, "/search/", params, QueryResponse.class).blockingFirst()
|
() -> this.postGet(ctx, 0, "/search/", params, QueryResponse.class).blockingFirst()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
public MqOutbox outbox() {
|
public MqOutbox outbox() {
|
||||||
|
@ -10,21 +10,19 @@ import nu.marginalia.client.exception.LocalException;
|
|||||||
import nu.marginalia.client.exception.NetworkException;
|
import nu.marginalia.client.exception.NetworkException;
|
||||||
import nu.marginalia.client.exception.RemoteException;
|
import nu.marginalia.client.exception.RemoteException;
|
||||||
import nu.marginalia.client.exception.RouteNotConfiguredException;
|
import nu.marginalia.client.exception.RouteNotConfiguredException;
|
||||||
|
import nu.marginalia.client.model.ClientRoute;
|
||||||
import okhttp3.*;
|
import okhttp3.*;
|
||||||
import org.apache.http.HttpHost;
|
import org.apache.http.HttpHost;
|
||||||
import org.apache.logging.log4j.ThreadContext;
|
import org.apache.logging.log4j.ThreadContext;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.OutputStreamWriter;
|
|
||||||
import java.net.ConnectException;
|
import java.net.ConnectException;
|
||||||
import java.util.Arrays;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
import java.util.zip.GZIPOutputStream;
|
|
||||||
|
|
||||||
public abstract class AbstractClient implements AutoCloseable {
|
public abstract class AbstractClient implements AutoCloseable {
|
||||||
private final Logger logger = LoggerFactory.getLogger(getClass());
|
private final Logger logger = LoggerFactory.getLogger(getClass());
|
||||||
@ -34,19 +32,27 @@ public abstract class AbstractClient implements AutoCloseable {
|
|||||||
private final OkHttpClient client;
|
private final OkHttpClient client;
|
||||||
|
|
||||||
private boolean quiet;
|
private boolean quiet;
|
||||||
private String serviceRoute;
|
private final Map<Integer, String> serviceRoutes;
|
||||||
private int timeout;
|
private int timeout;
|
||||||
|
|
||||||
|
private final LivenessMonitor livenessMonitor = new LivenessMonitor();
|
||||||
private volatile boolean alive;
|
private final Thread livenessMonitorThread;
|
||||||
private final Thread livenessMonitor;
|
|
||||||
|
|
||||||
public void setTimeout(int timeout) {
|
public void setTimeout(int timeout) {
|
||||||
this.timeout = timeout;
|
this.timeout = timeout;
|
||||||
}
|
}
|
||||||
|
|
||||||
public AbstractClient(String host, int port, int timeout, Supplier<Gson> gsonProvider) {
|
public AbstractClient(ClientRoute route, int timeout, Supplier<Gson> gsonProvider) {
|
||||||
logger.info("Creating client for {}[{}:{}]", getClass().getSimpleName(), host, port);
|
this(Map.of(0, route), timeout, gsonProvider);
|
||||||
|
}
|
||||||
|
|
||||||
|
public AbstractClient(Map<Integer, ClientRoute> routes,
|
||||||
|
int timeout,
|
||||||
|
Supplier<Gson> gsonProvider)
|
||||||
|
{
|
||||||
|
routes.forEach((node, route) -> {
|
||||||
|
logger.info("Creating client route for {}:{} -> {}:{}", getClass().getSimpleName(), node, route.host(), route.port());
|
||||||
|
});
|
||||||
|
|
||||||
this.gson = gsonProvider.get();
|
this.gson = gsonProvider.get();
|
||||||
|
|
||||||
@ -57,7 +63,12 @@ public abstract class AbstractClient implements AutoCloseable {
|
|||||||
.retryOnConnectionFailure(true)
|
.retryOnConnectionFailure(true)
|
||||||
.followRedirects(true)
|
.followRedirects(true)
|
||||||
.build();
|
.build();
|
||||||
serviceRoute = new HttpHost(host, port).toURI();
|
|
||||||
|
serviceRoutes = new HashMap<>(routes.size());
|
||||||
|
|
||||||
|
routes.forEach((node, client) ->
|
||||||
|
serviceRoutes.put(node, new HttpHost(client.host(), client.port()).toURI())
|
||||||
|
);
|
||||||
|
|
||||||
RxJavaPlugins.setErrorHandler(e -> {
|
RxJavaPlugins.setErrorHandler(e -> {
|
||||||
if (e.getMessage() == null) {
|
if (e.getMessage() == null) {
|
||||||
@ -67,52 +78,71 @@ public abstract class AbstractClient implements AutoCloseable {
|
|||||||
logger.error("Error {}: {}", e.getClass().getSimpleName(), e.getMessage());
|
logger.error("Error {}: {}", e.getClass().getSimpleName(), e.getMessage());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
livenessMonitor = new Thread(this::monitorLiveness, host + "-monitor");
|
|
||||||
livenessMonitor.setDaemon(true);
|
livenessMonitorThread = new Thread(livenessMonitor, getClass().getSimpleName() + "-monitor");
|
||||||
livenessMonitor.start();
|
livenessMonitorThread.setDaemon(true);
|
||||||
|
livenessMonitorThread.start();
|
||||||
|
|
||||||
logger.info("Finished creating client for {}", getClass().getSimpleName());
|
logger.info("Finished creating client for {}", getClass().getSimpleName());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setServiceRoute(String hostname, int port) {
|
private class LivenessMonitor implements Runnable {
|
||||||
scheduler().abort();
|
private final ConcurrentHashMap<Integer, Boolean> alivenessMap = new ConcurrentHashMap<>();
|
||||||
serviceRoute = new HttpHost(hostname, port).toURI();
|
|
||||||
}
|
|
||||||
|
|
||||||
protected String getServiceRoute() {
|
@SneakyThrows
|
||||||
return serviceRoute;
|
public void run() {
|
||||||
}
|
Thread.sleep(100); // Wait for initialization
|
||||||
|
try {
|
||||||
@SneakyThrows
|
for (; ; ) {
|
||||||
private void monitorLiveness() {
|
boolean allAlive = true;
|
||||||
Thread.sleep(100); // Wait for initialization
|
try {
|
||||||
try {
|
for (int node : serviceRoutes.keySet()) {
|
||||||
for (; ; ) {
|
boolean isResponsive = isResponsive(node);
|
||||||
try {
|
alivenessMap.put(node, isResponsive);
|
||||||
alive = isResponsive();
|
allAlive &= !isResponsive;
|
||||||
}
|
}
|
||||||
//
|
}
|
||||||
catch (Exception ex) {
|
//
|
||||||
logger.warn("Oops", ex);
|
catch (Exception ex) {
|
||||||
}
|
logger.warn("Oops", ex);
|
||||||
synchronized (livenessMonitor) {
|
}
|
||||||
if (alive) {
|
if (allAlive) {
|
||||||
livenessMonitor.wait(1000);
|
synchronized (this) {
|
||||||
|
wait(1000);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!alive) {
|
} catch (InterruptedException ex) {
|
||||||
Thread.sleep(100);
|
// nothing to see here
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch (InterruptedException ex) {
|
|
||||||
// nothing to see here
|
public boolean isAlive(int node) {
|
||||||
|
return alivenessMap.getOrDefault(node, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized boolean isResponsive(int node) {
|
||||||
|
Context ctx = Context.internal("ping");
|
||||||
|
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + "/internal/ping").get().build();
|
||||||
|
|
||||||
|
return Observable.just(client.newCall(req))
|
||||||
|
.subscribeOn(scheduler().get())
|
||||||
|
.map(Call::execute)
|
||||||
|
.map(AbstractClient.this::getResponseStatus)
|
||||||
|
.flatMap(line -> validateStatus(line, req).timeout(5000, TimeUnit.SECONDS).onErrorReturn(e -> 500))
|
||||||
|
.onErrorReturn(error -> 500)
|
||||||
|
.map(HttpStatusCode::new)
|
||||||
|
.map(HttpStatusCode::isGood)
|
||||||
|
.blockingFirst();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void close() {
|
public void close() {
|
||||||
livenessMonitor.interrupt();
|
livenessMonitorThread.interrupt();
|
||||||
scheduler().close();
|
scheduler().close();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -124,25 +154,11 @@ public abstract class AbstractClient implements AutoCloseable {
|
|||||||
|
|
||||||
public abstract String name();
|
public abstract String name();
|
||||||
|
|
||||||
public synchronized boolean isResponsive() {
|
|
||||||
Context ctx = Context.internal("ping");
|
|
||||||
var req = ctx.paint(new Request.Builder()).url(serviceRoute + "/internal/ping").get().build();
|
|
||||||
|
|
||||||
return Observable.just(client.newCall(req))
|
|
||||||
.subscribeOn(scheduler().get())
|
|
||||||
.map(Call::execute)
|
|
||||||
.map(this::getResponseStatus)
|
|
||||||
.flatMap(line -> validateStatus(line, req).timeout(5000, TimeUnit.SECONDS).onErrorReturn(e -> 500))
|
|
||||||
.onErrorReturn(error -> 500)
|
|
||||||
.map(HttpStatusCode::new)
|
|
||||||
.map(HttpStatusCode::isGood)
|
|
||||||
.blockingFirst();
|
|
||||||
}
|
|
||||||
|
|
||||||
public synchronized boolean isAccepting() {
|
public synchronized boolean isAccepting() {
|
||||||
Context ctx = Context.internal("ready");
|
Context ctx = Context.internal("ready");
|
||||||
|
|
||||||
var req = ctx.paint(new Request.Builder()).url(serviceRoute + "/internal/ready").get().build();
|
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(0) + "/internal/ready").get().build();
|
||||||
|
|
||||||
return Observable.just(client.newCall(req))
|
return Observable.just(client.newCall(req))
|
||||||
.subscribeOn(scheduler().get())
|
.subscribeOn(scheduler().get())
|
||||||
@ -157,13 +173,16 @@ public abstract class AbstractClient implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
protected synchronized Observable<HttpStatusCode> post(Context ctx, String endpoint, Object data) {
|
protected synchronized Observable<HttpStatusCode> post(Context ctx,
|
||||||
|
int node,
|
||||||
|
String endpoint,
|
||||||
|
Object data) {
|
||||||
|
|
||||||
ensureAlive();
|
ensureAlive(node);
|
||||||
|
|
||||||
RequestBody body = RequestBody.create(json(data), MediaType.parse("application/json; charset=utf-8"));
|
RequestBody body = RequestBody.create(json(data), MediaType.parse("application/json; charset=utf-8"));
|
||||||
|
|
||||||
var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).post(body).build();
|
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).post(body).build();
|
||||||
|
|
||||||
return Observable
|
return Observable
|
||||||
.just(client.newCall(req))
|
.just(client.newCall(req))
|
||||||
@ -180,17 +199,17 @@ public abstract class AbstractClient implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
protected synchronized Observable<HttpStatusCode> post(Context ctx, String endpoint, GeneratedMessageV3 data) {
|
protected synchronized Observable<HttpStatusCode> post(Context ctx, int node, String endpoint, GeneratedMessageV3 data) {
|
||||||
|
|
||||||
ensureAlive();
|
ensureAlive(0);
|
||||||
|
|
||||||
RequestBody body = RequestBody.create(data.toByteArray(), MediaType.parse("application/protobuf"));
|
RequestBody body = RequestBody.create(data.toByteArray(), MediaType.parse("application/protobuf"));
|
||||||
|
|
||||||
var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).post(body).build();
|
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).post(body).build();
|
||||||
var call = client.newCall(req);
|
var call = client.newCall(req);
|
||||||
|
|
||||||
logInbound(call);
|
logInbound(call);
|
||||||
ThreadContext.put("outbound-request", serviceRoute + endpoint);
|
ThreadContext.put("outbound-request", serviceRoutes.get(node) + endpoint);
|
||||||
try (var rsp = call.execute()) {
|
try (var rsp = call.execute()) {
|
||||||
logOutbound(rsp);
|
logOutbound(rsp);
|
||||||
int code = rsp.code();
|
int code = rsp.code();
|
||||||
@ -204,12 +223,12 @@ public abstract class AbstractClient implements AutoCloseable {
|
|||||||
|
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
protected synchronized <T> Observable<T> postGet(Context ctx, String endpoint, Object data, Class<T> returnType) {
|
protected synchronized <T> Observable<T> postGet(Context ctx, int node, String endpoint, Object data, Class<T> returnType) {
|
||||||
|
|
||||||
ensureAlive();
|
ensureAlive(0);
|
||||||
|
|
||||||
RequestBody body = RequestBody.create(json(data), MediaType.parse("application/json"));
|
RequestBody body = RequestBody.create(json(data), MediaType.parse("application/json"));
|
||||||
var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).post(body).build();
|
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).post(body).build();
|
||||||
|
|
||||||
return Observable.just(client.newCall(req))
|
return Observable.just(client.newCall(req))
|
||||||
.subscribeOn(scheduler().get())
|
.subscribeOn(scheduler().get())
|
||||||
@ -223,18 +242,18 @@ public abstract class AbstractClient implements AutoCloseable {
|
|||||||
.doFinally(() -> ThreadContext.remove("outbound-request"));
|
.doFinally(() -> ThreadContext.remove("outbound-request"));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized Observable<HttpStatusCode> post(Context ctx, String endpoint, String data, MediaType mediaType) {
|
protected synchronized Observable<HttpStatusCode> post(Context ctx, int node, String endpoint, String data, MediaType mediaType) {
|
||||||
ensureAlive();
|
ensureAlive(0);
|
||||||
|
|
||||||
var body = RequestBody.create(data, mediaType);
|
var body = RequestBody.create(data, mediaType);
|
||||||
|
|
||||||
var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).post(body).build();
|
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).post(body).build();
|
||||||
var call = client.newCall(req);
|
var call = client.newCall(req);
|
||||||
|
|
||||||
|
|
||||||
return Observable.just(call)
|
return Observable.just(call)
|
||||||
.map((c) -> {
|
.map((c) -> {
|
||||||
ThreadContext.put(CONTEXT_OUTBOUND_REQUEST, serviceRoute + endpoint);
|
ThreadContext.put(CONTEXT_OUTBOUND_REQUEST, serviceRoutes.get(node) + endpoint);
|
||||||
return c;
|
return c;
|
||||||
})
|
})
|
||||||
.subscribeOn(scheduler().get())
|
.subscribeOn(scheduler().get())
|
||||||
@ -249,10 +268,10 @@ public abstract class AbstractClient implements AutoCloseable {
|
|||||||
.doFinally(() -> ThreadContext.remove("outbound-request"));
|
.doFinally(() -> ThreadContext.remove("outbound-request"));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized <T> Observable<T> get(Context ctx, String endpoint, Class<T> type) {
|
protected synchronized <T> Observable<T> get(Context ctx, int node, String endpoint, Class<T> type) {
|
||||||
ensureAlive();
|
ensureAlive(0);
|
||||||
|
|
||||||
var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).get().build();
|
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).get().build();
|
||||||
|
|
||||||
return Observable.just(client.newCall(req))
|
return Observable.just(client.newCall(req))
|
||||||
.subscribeOn(scheduler().get())
|
.subscribeOn(scheduler().get())
|
||||||
@ -267,10 +286,10 @@ public abstract class AbstractClient implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected synchronized Observable<String> get(Context ctx, String endpoint) {
|
protected synchronized Observable<String> get(Context ctx, int node, String endpoint) {
|
||||||
ensureAlive();
|
ensureAlive(0);
|
||||||
|
|
||||||
var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).get().build();
|
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).get().build();
|
||||||
|
|
||||||
return Observable.just(client.newCall(req))
|
return Observable.just(client.newCall(req))
|
||||||
.subscribeOn(scheduler().get())
|
.subscribeOn(scheduler().get())
|
||||||
@ -284,10 +303,10 @@ public abstract class AbstractClient implements AutoCloseable {
|
|||||||
.doFinally(() -> ThreadContext.remove("outbound-request"));
|
.doFinally(() -> ThreadContext.remove("outbound-request"));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected synchronized Observable<HttpStatusCode> delete(Context ctx, String endpoint) {
|
protected synchronized Observable<HttpStatusCode> delete(Context ctx, int node, String endpoint) {
|
||||||
ensureAlive();
|
ensureAlive(0);
|
||||||
|
|
||||||
var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).delete().build();
|
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + endpoint).delete().build();
|
||||||
|
|
||||||
return Observable.just(client.newCall(req))
|
return Observable.just(client.newCall(req))
|
||||||
.subscribeOn(scheduler().get())
|
.subscribeOn(scheduler().get())
|
||||||
@ -314,11 +333,11 @@ public abstract class AbstractClient implements AutoCloseable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
private void ensureAlive() {
|
private void ensureAlive(int node) {
|
||||||
if (!isAlive()) {
|
if (!isAlive(node)) {
|
||||||
wait(2000);
|
wait(2000);
|
||||||
if (!isAlive()) {
|
if (!isAlive(node)) {
|
||||||
throw new RouteNotConfiguredException("Route not configured for " + name() + " -- tried " + serviceRoute);
|
throw new RouteNotConfiguredException("Route not configured for " + name() + " -- tried " + serviceRoutes.get(node));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -331,6 +350,7 @@ public abstract class AbstractClient implements AutoCloseable {
|
|||||||
private Observable<Throwable> filterRetryableExceptions(Throwable error) throws Throwable {
|
private Observable<Throwable> filterRetryableExceptions(Throwable error) throws Throwable {
|
||||||
|
|
||||||
synchronized (livenessMonitor) {
|
synchronized (livenessMonitor) {
|
||||||
|
// Signal to the liveness monitor that we may have an outage
|
||||||
livenessMonitor.notifyAll();
|
livenessMonitor.notifyAll();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -402,8 +422,8 @@ public abstract class AbstractClient implements AutoCloseable {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isAlive() {
|
public boolean isAlive(int node) {
|
||||||
return alive;
|
return livenessMonitor.isAlive(node);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String json(Object o) {
|
private String json(Object o) {
|
||||||
|
@ -3,6 +3,7 @@ package nu.marginalia.client;
|
|||||||
import com.google.gson.Gson;
|
import com.google.gson.Gson;
|
||||||
import io.reactivex.rxjava3.core.Observable;
|
import io.reactivex.rxjava3.core.Observable;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
|
import nu.marginalia.client.model.ClientRoute;
|
||||||
import nu.marginalia.service.descriptor.ServiceDescriptor;
|
import nu.marginalia.service.descriptor.ServiceDescriptor;
|
||||||
import nu.marginalia.service.descriptor.HostsFile;
|
import nu.marginalia.service.descriptor.HostsFile;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
@ -17,7 +18,11 @@ public class AbstractDynamicClient extends AbstractClient {
|
|||||||
private final AbortingScheduler scheduler;
|
private final AbortingScheduler scheduler;
|
||||||
|
|
||||||
public AbstractDynamicClient(@Nonnull ServiceDescriptor service, HostsFile hosts, Supplier<Gson> gsonProvider) {
|
public AbstractDynamicClient(@Nonnull ServiceDescriptor service, HostsFile hosts, Supplier<Gson> gsonProvider) {
|
||||||
super(hosts.getHost(service), service.port, 10, gsonProvider);
|
super(
|
||||||
|
new ClientRoute(hosts.getHost(service), service.port),
|
||||||
|
10,
|
||||||
|
gsonProvider
|
||||||
|
);
|
||||||
|
|
||||||
this.service = service;
|
this.service = service;
|
||||||
this.scheduler = new AbortingScheduler(name());
|
this.scheduler = new AbortingScheduler(name());
|
||||||
@ -32,14 +37,6 @@ public class AbstractDynamicClient extends AbstractClient {
|
|||||||
return service;
|
return service;
|
||||||
}
|
}
|
||||||
|
|
||||||
@SneakyThrows
|
|
||||||
public void blockingWait() {
|
|
||||||
logger.info("Waiting for route to {} ({})", service, getServiceRoute());
|
|
||||||
while (!isAlive()) {
|
|
||||||
Thread.sleep(1000);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public AbortingScheduler scheduler() {
|
public AbortingScheduler scheduler() {
|
||||||
return scheduler;
|
return scheduler;
|
||||||
|
@ -0,0 +1,4 @@
|
|||||||
|
package nu.marginalia.client.model;
|
||||||
|
|
||||||
|
public record ClientRoute(String host, int port) {
|
||||||
|
}
|
@ -5,6 +5,7 @@ import io.reactivex.rxjava3.core.Observable;
|
|||||||
import lombok.AllArgsConstructor;
|
import lombok.AllArgsConstructor;
|
||||||
import lombok.Data;
|
import lombok.Data;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
|
import nu.marginalia.client.model.ClientRoute;
|
||||||
import org.junit.jupiter.api.*;
|
import org.junit.jupiter.api.*;
|
||||||
import spark.Request;
|
import spark.Request;
|
||||||
import spark.Response;
|
import spark.Response;
|
||||||
@ -33,7 +34,7 @@ public class AbstractClientTest {
|
|||||||
int port = new Random().nextInt(6000, 10000);
|
int port = new Random().nextInt(6000, 10000);
|
||||||
testServer = new TestServer(port);
|
testServer = new TestServer(port);
|
||||||
|
|
||||||
client = new AbstractClient("localhost", port, 1, Gson::new) {
|
client = new AbstractClient(new ClientRoute("localhost", port), 1, Gson::new) {
|
||||||
@Override
|
@Override
|
||||||
public AbortingScheduler scheduler() {
|
public AbortingScheduler scheduler() {
|
||||||
return new AbortingScheduler(name());
|
return new AbortingScheduler(name());
|
||||||
@ -79,47 +80,47 @@ public class AbstractClientTest {
|
|||||||
public void testGetTimeout() {
|
public void testGetTimeout() {
|
||||||
testServer.get(this::timeout);
|
testServer.get(this::timeout);
|
||||||
|
|
||||||
assertError(client.get(Context.internal(), "/get"));
|
assertError(client.get(Context.internal(), 0, "/get"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPostTimeout() {
|
public void testPostTimeout() {
|
||||||
testServer.post(this::timeout);
|
testServer.post(this::timeout);
|
||||||
|
|
||||||
assertError(client.post(Context.internal(), "/post", "test"));
|
assertError(client.post(Context.internal(), 0, "/post", "test"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDeleteTimeout() {
|
public void testDeleteTimeout() {
|
||||||
testServer.delete(this::timeout);
|
testServer.delete(this::timeout);
|
||||||
|
|
||||||
assertError(client.delete(Context.internal(), "/post"));
|
assertError(client.delete(Context.internal(), 0,"/post"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPost404() {
|
public void testPost404() {
|
||||||
testServer.post(this::error404);
|
testServer.post(this::error404);
|
||||||
|
|
||||||
assertError(client.post(Context.internal(), "/post", "test"));
|
assertError(client.post(Context.internal(), 0,"/post", "test"));
|
||||||
}
|
}
|
||||||
@Test
|
@Test
|
||||||
public void testGet404() {
|
public void testGet404() {
|
||||||
testServer.get(this::error404);
|
testServer.get(this::error404);
|
||||||
|
|
||||||
assertError(client.get(Context.internal(), "/get"));
|
assertError(client.get(Context.internal(), 0,"/get"));
|
||||||
}
|
}
|
||||||
@Test
|
@Test
|
||||||
public void testDelete404() {
|
public void testDelete404() {
|
||||||
testServer.delete(this::error404);
|
testServer.delete(this::error404);
|
||||||
|
|
||||||
assertError(client.delete(Context.internal(), "/delete"));
|
assertError(client.delete(Context.internal(),0, "/delete"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGet() {
|
public void testGet() {
|
||||||
testServer.get((req, rsp) -> "Hello World");
|
testServer.get((req, rsp) -> "Hello World");
|
||||||
|
|
||||||
assertEquals("Hello World", client.get(Context.internal(), "/get").blockingFirst());
|
assertEquals("Hello World", client.get(Context.internal(), 0,"/get").blockingFirst());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -138,7 +139,7 @@ public class AbstractClientTest {
|
|||||||
public void testGetJson() {
|
public void testGetJson() {
|
||||||
testServer.get((req, rsp) -> new DummyObject(5, "23"), new Gson()::toJson);
|
testServer.get((req, rsp) -> new DummyObject(5, "23"), new Gson()::toJson);
|
||||||
|
|
||||||
assertEquals(client.get(Context.internal(), "/get", DummyObject.class).blockingFirst(),
|
assertEquals(client.get(Context.internal(), 0,"/get", DummyObject.class).blockingFirst(),
|
||||||
new DummyObject(5, "23"));
|
new DummyObject(5, "23"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -147,7 +148,7 @@ public class AbstractClientTest {
|
|||||||
public void testDelete() {
|
public void testDelete() {
|
||||||
testServer.delete((req, rsp) -> "Hello World");
|
testServer.delete((req, rsp) -> "Hello World");
|
||||||
|
|
||||||
assertTrue(client.delete(Context.internal(), "/delete").blockingFirst().isGood());
|
assertTrue(client.delete(Context.internal(), 0,"/delete").blockingFirst().isGood());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -159,7 +160,7 @@ public class AbstractClientTest {
|
|||||||
return "ok";
|
return "ok";
|
||||||
});
|
});
|
||||||
|
|
||||||
client.post(Context.internal(), "/post", new DummyObject(5, "23")).blockingSubscribe();
|
client.post(Context.internal(),0, "/post", new DummyObject(5, "23")).blockingSubscribe();
|
||||||
assertEquals(1, inbox.size());
|
assertEquals(1, inbox.size());
|
||||||
assertEquals(new DummyObject(5, "23"), inbox.get(0));
|
assertEquals(new DummyObject(5, "23"), inbox.get(0));
|
||||||
}
|
}
|
||||||
@ -172,7 +173,7 @@ public class AbstractClientTest {
|
|||||||
return new DummyObject(1, "ret");
|
return new DummyObject(1, "ret");
|
||||||
}, gson::toJson);
|
}, gson::toJson);
|
||||||
|
|
||||||
var ret = client.postGet(Context.internal(), "/post", new DummyObject(5, "23"), DummyObject.class).blockingFirst();
|
var ret = client.postGet(Context.internal(), 0,"/post", new DummyObject(5, "23"), DummyObject.class).blockingFirst();
|
||||||
assertEquals(1, inbox.size());
|
assertEquals(1, inbox.size());
|
||||||
assertEquals(new DummyObject(5, "23"), inbox.get(0));
|
assertEquals(new DummyObject(5, "23"), inbox.get(0));
|
||||||
assertEquals(new DummyObject(1, "ret"), ret);
|
assertEquals(new DummyObject(1, "ret"), ret);
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package nu.marginalia.service.descriptor;
|
package nu.marginalia.service.descriptor;
|
||||||
|
|
||||||
|
import nu.marginalia.service.SearchServiceDescriptors;
|
||||||
import nu.marginalia.service.id.ServiceId;
|
import nu.marginalia.service.id.ServiceId;
|
||||||
|
|
||||||
import java.util.LinkedHashMap;
|
import java.util.LinkedHashMap;
|
||||||
@ -7,6 +8,7 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
|
|
||||||
|
/** @see SearchServiceDescriptors */
|
||||||
public class ServiceDescriptors {
|
public class ServiceDescriptors {
|
||||||
private final Map<ServiceId, ServiceDescriptor> descriptorsAll = new LinkedHashMap<>();
|
private final Map<ServiceId, ServiceDescriptor> descriptorsAll = new LinkedHashMap<>();
|
||||||
|
|
||||||
|
@ -1,9 +1,6 @@
|
|||||||
package nu.marginalia.service.module;
|
package nu.marginalia.service.module;
|
||||||
|
|
||||||
import com.google.inject.AbstractModule;
|
import com.google.inject.AbstractModule;
|
||||||
import com.google.inject.Provides;
|
|
||||||
import com.google.inject.name.Named;
|
|
||||||
import com.google.inject.name.Names;
|
|
||||||
import nu.marginalia.service.descriptor.ServiceDescriptors;
|
import nu.marginalia.service.descriptor.ServiceDescriptors;
|
||||||
import nu.marginalia.service.id.ServiceId;
|
import nu.marginalia.service.id.ServiceId;
|
||||||
|
|
||||||
@ -22,12 +19,45 @@ public class ConfigurationModule extends AbstractModule {
|
|||||||
public void configure() {
|
public void configure() {
|
||||||
bind(ServiceDescriptors.class).toInstance(descriptors);
|
bind(ServiceDescriptors.class).toInstance(descriptors);
|
||||||
|
|
||||||
int basePort = descriptors.forId(id).port;
|
var configObject = new ServiceConfiguration(id,
|
||||||
int prometheusPort = basePort + 1000;
|
getNode(),
|
||||||
String host = System.getProperty("service-host", "127.0.0.1");
|
getHost(),
|
||||||
var configObject = new ServiceConfiguration(id, 0, host, basePort, prometheusPort, UUID.randomUUID());
|
getBasePort(),
|
||||||
|
getPrometheusPort(),
|
||||||
|
UUID.randomUUID()
|
||||||
|
);
|
||||||
|
|
||||||
bind(ServiceConfiguration.class).toInstance(configObject);
|
bind(ServiceConfiguration.class).toInstance(configObject);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int getBasePort() {
|
||||||
|
String port = System.getenv("WMSA_SERVICE_PORT");
|
||||||
|
|
||||||
|
if (port != null) {
|
||||||
|
return Integer.parseInt(port);
|
||||||
|
}
|
||||||
|
|
||||||
|
return descriptors.forId(id).port;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getPrometheusPort() {
|
||||||
|
String prometheusPortEnv = System.getenv("WMSA_PROMETHEUS_PORT");
|
||||||
|
|
||||||
|
if (prometheusPortEnv != null) {
|
||||||
|
return Integer.parseInt(prometheusPortEnv);
|
||||||
|
}
|
||||||
|
|
||||||
|
return descriptors.forId(id).port + 1000;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int getNode() {
|
||||||
|
String nodeEnv = Objects.requireNonNullElse(System.getenv("WMSA_SERVICE_NODE"), "0");
|
||||||
|
|
||||||
|
return Integer.parseInt(nodeEnv);
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getHost() {
|
||||||
|
return System.getProperty("service-host", "127.0.0.1");
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -28,40 +28,14 @@ public class SearchErrorPageService {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void serveError(Context ctx, Request request, Response rsp) {
|
public void serveError(Context ctx, Request request, Response rsp) {
|
||||||
boolean isIndexUp = indexClient.isAlive();
|
rsp.body(renderError(request, "Internal error",
|
||||||
|
"""
|
||||||
try {
|
An error occurred when communicating with the search engine index.
|
||||||
if (!isIndexUp) {
|
<p>
|
||||||
rsp.body(renderError(request, "The index is down",
|
This is hopefully a temporary state of affairs. It may be due to
|
||||||
"""
|
an upgrade. The index typically takes a about two or three minutes
|
||||||
The search index server appears to be down.
|
to reload from a cold restart. Thanks for your patience.
|
||||||
<p>
|
"""));
|
||||||
The server was possibly restarted to bring online some changes.
|
|
||||||
Restarting the index typically takes a few minutes, during which
|
|
||||||
searches can't be served.
|
|
||||||
"""));
|
|
||||||
} else if (indexClient.isBlocked(ctx).blockingFirst()) {
|
|
||||||
rsp.body(renderError(request, "The index is starting up",
|
|
||||||
"""
|
|
||||||
The search index server appears to be in the process of starting up.
|
|
||||||
This typically takes a few minutes. Be patient.
|
|
||||||
"""));
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
rsp.body(renderError(request, "Error processing request",
|
|
||||||
"""
|
|
||||||
The search index appears to be up and running, so the problem may be related
|
|
||||||
to some wider general error, or pertain to an error handling your query.
|
|
||||||
"""));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
catch (Exception ex) {
|
|
||||||
logger.warn("Error during rendering of error page", ex);
|
|
||||||
rsp.body(renderError(request, "Error processing error",
|
|
||||||
"""
|
|
||||||
An error has occurred, additionally, an error occurred while handling that error
|
|
||||||
"""));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private String renderError(Request request, String title, String message) {
|
private String renderError(Request request, String title, String message) {
|
||||||
|
@ -73,7 +73,7 @@ public class QueryService extends Service {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private SearchResultSet executeQuery(Context ctx, SearchSpecification query) {
|
private SearchResultSet executeQuery(Context ctx, SearchSpecification query) {
|
||||||
return indexClient.query(ctx, query);
|
return indexClient.query(ctx, 0, query);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isBlacklisted(DecoratedSearchResultItem item) {
|
private boolean isBlacklisted(DecoratedSearchResultItem item) {
|
||||||
|
Loading…
Reference in New Issue
Block a user