(grpc) Reduce error spam
This commit is contained in:
parent
c4a27003c6
commit
a0648844fb
@ -8,6 +8,7 @@ import nu.marginalia.service.discovery.monitor.ServiceChangeMonitor;
|
||||
import nu.marginalia.service.discovery.property.PartitionTraits;
|
||||
import nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress;
|
||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -73,9 +74,11 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
|
||||
return true;
|
||||
}
|
||||
|
||||
private class ConnectionHolder {
|
||||
private class ConnectionHolder implements Comparable<ConnectionHolder> {
|
||||
private final AtomicReference<ManagedChannel> channel = new AtomicReference<>();
|
||||
private final InstanceAddress address;
|
||||
private volatile long lastError = Long.MIN_VALUE;
|
||||
private volatile long lastUsed = Long.MAX_VALUE;
|
||||
|
||||
ConnectionHolder(InstanceAddress address) {
|
||||
this.address = address;
|
||||
@ -83,6 +86,9 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
|
||||
|
||||
public ManagedChannel get() {
|
||||
var value = channel.get();
|
||||
|
||||
lastUsed = System.currentTimeMillis();
|
||||
|
||||
if (value != null) {
|
||||
return value;
|
||||
}
|
||||
@ -125,6 +131,23 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
|
||||
public int hashCode() {
|
||||
return Objects.hash(address);
|
||||
}
|
||||
|
||||
private boolean hasRecentError() {
|
||||
return System.currentTimeMillis() < lastError + 5000;
|
||||
}
|
||||
|
||||
void flagError() {
|
||||
lastError = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int compareTo(@NotNull GrpcSingleNodeChannelPool<STUB>.ConnectionHolder o) {
|
||||
// If one has recently errored and the other has not, the one that has not errored is preferred
|
||||
int diff = Boolean.compare(hasRecentError(), o.hasRecentError());
|
||||
if (diff != 0) return diff;
|
||||
|
||||
return Long.compare(lastUsed, o.lastUsed);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -150,14 +173,15 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
|
||||
final List<Exception> exceptions = new ArrayList<>();
|
||||
final List<ConnectionHolder> connectionHolders = new ArrayList<>(channels.values());
|
||||
|
||||
// Randomize the order of the connection holders to spread out the load
|
||||
Collections.shuffle(connectionHolders);
|
||||
Collections.sort(connectionHolders);
|
||||
|
||||
for (var channel : connectionHolders) {
|
||||
try {
|
||||
return call.apply(stubConstructor.apply(channel.get()), arg);
|
||||
}
|
||||
catch (Exception e) {
|
||||
channel.flagError();
|
||||
|
||||
exceptions.add(e);
|
||||
}
|
||||
}
|
||||
|
@ -8,6 +8,7 @@ import io.prometheus.client.Counter;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.mq.inbox.*;
|
||||
import nu.marginalia.service.NamedExecutorFactory;
|
||||
import nu.marginalia.service.client.ServiceNotAvailableException;
|
||||
import nu.marginalia.service.discovery.property.*;
|
||||
import nu.marginalia.service.id.ServiceId;
|
||||
import nu.marginalia.service.server.mq.ServiceMqSubscription;
|
||||
@ -99,7 +100,13 @@ public class Service {
|
||||
initialization.addCallback(() -> serviceRegistry.announceInstance(config.instanceUuid()));
|
||||
|
||||
Thread.setDefaultUncaughtExceptionHandler((t, e) -> {
|
||||
logger.error("Uncaught exception", e);
|
||||
if (e instanceof ServiceNotAvailableException) {
|
||||
// reduce log spam for this common case
|
||||
logger.error("Service not available: {}", e.getMessage());
|
||||
}
|
||||
else {
|
||||
logger.error("Uncaught exception", e);
|
||||
}
|
||||
request_counter_err.labels(serviceName, Integer.toString(node)).inc();
|
||||
});
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user