(client) Refactor liveness monitor

This commit is contained in:
Viktor Lofgren 2023-10-16 12:34:01 +02:00
parent f718482e98
commit 3d1c15ef99
2 changed files with 103 additions and 65 deletions

View File

@ -32,11 +32,10 @@ public abstract class AbstractClient implements AutoCloseable {
private final OkHttpClient client;
private boolean quiet;
private final ServiceRoutes serviceRoutes;
final ServiceRoutes serviceRoutes;
private int timeout;
private final LivenessMonitor livenessMonitor = new LivenessMonitor();
private final Thread livenessMonitorThread;
private final EndpointLivenessMonitor livenessMonitor;
public void setTimeout(int timeout) {
this.timeout = timeout;
@ -71,71 +70,13 @@ public abstract class AbstractClient implements AutoCloseable {
}
});
livenessMonitorThread = new Thread(livenessMonitor, getClass().getSimpleName() + "-monitor");
livenessMonitorThread.setDaemon(true);
livenessMonitorThread.start();
logger.info("Finished creating client for {}", getClass().getSimpleName());
}
private class LivenessMonitor implements Runnable {
private final ConcurrentHashMap<Integer, Boolean> alivenessMap = new ConcurrentHashMap<>();
@SneakyThrows
public void run() {
Thread.sleep(100); // Wait for initialization
try {
for (; ; ) {
boolean allAlive = true;
try {
for (int node : serviceRoutes.getNodes()) {
boolean isResponsive = isResponsive(node);
alivenessMap.put(node, isResponsive);
allAlive &= isResponsive;
}
}
//
catch (Exception ex) {
logger.warn("Oops", ex);
}
if (allAlive) {
synchronized (this) {
wait(1000);
}
}
else {
Thread.sleep(100);
}
}
} catch (InterruptedException ex) {
// nothing to see here
}
}
public boolean isAlive(int node) {
// compute-if-absence ensures we do a synchronous status check if this is a cold start,
// that way we don't have to wait for the polling loop to find out if the service is up
return alivenessMap.computeIfAbsent(node, this::isResponsive);
}
public synchronized boolean isResponsive(int node) {
Context ctx = Context.internal("ping");
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + "/internal/ping").get().build();
return Observable.just(client.newCall(req))
.subscribeOn(scheduler().get())
.map(Call::execute)
.map(AbstractClient.this::getResponseStatus)
.flatMap(line -> validateStatus(line, req).timeout(5000, TimeUnit.SECONDS).onErrorReturn(e -> 500))
.onErrorReturn(error -> 500)
.map(HttpStatusCode::new)
.map(HttpStatusCode::isGood)
.blockingFirst();
}
livenessMonitor = new EndpointLivenessMonitor(this);
}
@Override
public void close() {
livenessMonitorThread.interrupt();
livenessMonitor.close();
scheduler().close();
}
@ -165,6 +106,20 @@ public abstract class AbstractClient implements AutoCloseable {
.blockingFirst();
}
public synchronized boolean isResponsive(int node) {
Context ctx = Context.internal("ping");
var req = ctx.paint(new Request.Builder()).url(serviceRoutes.get(node) + "/internal/ping").get().build();
return Observable.just(client.newCall(req))
.subscribeOn(scheduler().get())
.map(Call::execute)
.map(AbstractClient.this::getResponseStatus)
.flatMap(line -> validateStatus(line, req).timeout(5000, TimeUnit.SECONDS).onErrorReturn(e -> 500))
.onErrorReturn(error -> 500)
.map(HttpStatusCode::new)
.map(HttpStatusCode::isGood)
.blockingFirst();
}
@SneakyThrows
protected synchronized Observable<HttpStatusCode> post(Context ctx,
int node,
@ -330,8 +285,8 @@ public abstract class AbstractClient implements AutoCloseable {
if (!isAlive(node)) {
var route = serviceRoutes.get(node);
logger.error("Route not configured for {}:{}; {}; {}", name(), node, livenessMonitor.alivenessMap, serviceRoutes.getNodes()
.stream().map(serviceRoutes::get).toList());
logger.error("Route not configured for {}:{}", name(), node);
throw new RouteNotConfiguredException("Route not configured for " + name() + ":" + node + " -- tried " + route);
}
}

View File

@ -0,0 +1,83 @@
package nu.marginalia.client;
import lombok.SneakyThrows;
import nu.marginalia.client.route.ServiceRoutes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;
/** Keep tabs on which endpoints are accessible via polling. This permits us to reduce the chances of
* synchronous requests blocking on timeout.
*/
public class EndpointLivenessMonitor {
private final ConcurrentHashMap<Integer, Boolean> alivenessMap = new ConcurrentHashMap<>();
private final AbstractClient client;
private final ServiceRoutes serviceRoutes;
private static final Logger logger = LoggerFactory.getLogger(EndpointLivenessMonitor.class);
private static Thread daemonThread;
public EndpointLivenessMonitor(AbstractClient client) {
this.client = client;
this.serviceRoutes = client.serviceRoutes;
daemonThread = new Thread(this::run, client.getClass().getSimpleName()+":Liveness");
daemonThread.setDaemon(true);
daemonThread.start();
}
@SneakyThrows
public void run() {
Thread.sleep(100); // Wait for initialization
try {
while (!Thread.interrupted()) {
if (updateLivenessMap()) {
synchronized (this) {
wait(1000);
}
}
else Thread.sleep(100);
}
} catch (InterruptedException ex) {
// nothing to see here
}
}
private boolean updateLivenessMap() {
boolean allAlive = true;
for (int node : serviceRoutes.getNodes()) {
allAlive &= alivenessMap.compute(node, this::isResponsive);
}
return allAlive;
}
private boolean isResponsive(int node, Boolean oldValue) {
try {
boolean wasAlive = Boolean.TRUE.equals(oldValue);
boolean isAlive = client.isResponsive(node);
if (wasAlive != isAlive) {
logger.info("Liveness change {}:{} -- {}", client.name(), node, isAlive ? "UP":"DOWN");
}
return isAlive;
}
catch (Exception ex) {
logger.warn("Oops", ex);
return false;
}
}
public boolean isAlive(int node) {
// compute-if-absence ensures we do a synchronous status check if this is a cold start,
// that way we don't have to wait for the polling loop to find out if the service is up
return alivenessMap.computeIfAbsent(node, client::isResponsive);
}
public void close() {
daemonThread.interrupt();
}
}