(registry) Correct registerMonitor() behavior
The previous behavior would listen to too many changes, and based on zookeeper and not curator assumptions about behavior, add an additional monitor on each invocation of each monitor, (which always trigger on service state changes), leading to each monitor re-registering and effectively doubling monitors in numbers whenever a service stopped or started, which in turn meant a lot of bizarre thrashing behavior even on changes in services that don't explicitly talk to each other. This re-registering behavior is no longer done.
This commit is contained in:
parent
46423612e3
commit
57e6a12d08
@ -52,7 +52,7 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
|
||||
|
||||
|
||||
@Override
|
||||
public synchronized boolean onChange() {
|
||||
public synchronized void onChange() {
|
||||
Set<InstanceAddress> newRoutes = serviceRegistryIf.getEndpoints(serviceKey);
|
||||
Set<InstanceAddress> oldRoutes = new HashSet<>(channels.keySet());
|
||||
|
||||
@ -71,7 +71,6 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// Mostly for testing
|
||||
|
@ -9,9 +9,9 @@ import static nu.marginalia.service.discovery.property.ServiceEndpoint.*;
|
||||
|
||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.api.CuratorWatcher;
|
||||
import org.apache.curator.utils.ZKPaths;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -212,29 +212,35 @@ public class ZkServiceRegistry implements ServiceRegistryIf {
|
||||
|
||||
String path = monitor.getKey().toPath();
|
||||
|
||||
CuratorWatcher watcher = change -> {
|
||||
boolean reRegister;
|
||||
try {
|
||||
reRegister = monitor.onChange();
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.error("Monitor for path {} failed", path, ex);
|
||||
reRegister = true;
|
||||
}
|
||||
|
||||
if (reRegister) {
|
||||
registerMonitor(monitor);
|
||||
}
|
||||
};
|
||||
|
||||
curatorFramework.watchers().add()
|
||||
.usingWatcher(watcher)
|
||||
.usingWatcher((Watcher) change -> {
|
||||
Watcher.Event.EventType type = change.getType();
|
||||
|
||||
if (type == Watcher.Event.EventType.NodeCreated) {
|
||||
monitor.onChange();
|
||||
}
|
||||
if (type == Watcher.Event.EventType.NodeDeleted) {
|
||||
monitor.onChange();
|
||||
}
|
||||
})
|
||||
.forPath(path);
|
||||
|
||||
// Also register for updates to the running-instances list,
|
||||
// as this will have an effect on the result of getEndpoints()
|
||||
curatorFramework.watchers().add()
|
||||
.usingWatcher(watcher)
|
||||
.usingWatcher((Watcher) change -> {
|
||||
Watcher.Event.EventType type = change.getType();
|
||||
|
||||
if ("/running-instances".equals(change.getPath()))
|
||||
return;
|
||||
|
||||
if (type == Watcher.Event.EventType.NodeCreated) {
|
||||
monitor.onChange();
|
||||
}
|
||||
if (type == Watcher.Event.EventType.NodeDeleted) {
|
||||
monitor.onChange();
|
||||
}
|
||||
})
|
||||
.forPath("/running-instances");
|
||||
}
|
||||
|
||||
|
@ -9,7 +9,7 @@ public abstract class ServiceChangeMonitor implements ServiceMonitorIf {
|
||||
this.serviceKey = key;
|
||||
}
|
||||
|
||||
public abstract boolean onChange();
|
||||
public abstract void onChange();
|
||||
public ServiceKey<?> getKey() {
|
||||
return serviceKey;
|
||||
}
|
||||
|
@ -4,10 +4,7 @@ package nu.marginalia.service.discovery.monitor;
|
||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||
|
||||
public interface ServiceMonitorIf {
|
||||
/** Called when the monitored service has changed.
|
||||
* @return true if the monitor is to be refreshed
|
||||
*/
|
||||
boolean onChange();
|
||||
void onChange();
|
||||
ServiceKey<?> getKey();
|
||||
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
package nu.marginalia.service.discovery;
|
||||
|
||||
import nu.marginalia.service.discovery.monitor.ServiceMonitorIf;
|
||||
import nu.marginalia.service.discovery.property.ServiceKey;
|
||||
import nu.marginalia.service.discovery.property.ServicePartition;
|
||||
import nu.marginalia.service.ServiceId;
|
||||
@ -9,12 +10,14 @@ import org.apache.curator.retry.ExponentialBackoffRetry;
|
||||
import org.junit.jupiter.api.*;
|
||||
import org.junit.jupiter.api.parallel.Execution;
|
||||
import org.junit.jupiter.api.parallel.ExecutionMode;
|
||||
import org.mockito.Mockito;
|
||||
import org.testcontainers.containers.GenericContainer;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@Testcontainers
|
||||
@Execution(ExecutionMode.SAME_THREAD)
|
||||
@ -173,4 +176,35 @@ class ZkServiceRegistryTest {
|
||||
|
||||
registry1.shutDown();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMonitors() throws Exception {
|
||||
var registry1 = createRegistry();
|
||||
var registry2 = createRegistry();
|
||||
var uuid1 = UUID.randomUUID();
|
||||
var uuid2 = UUID.randomUUID();
|
||||
|
||||
var monitor = Mockito.mock(ServiceMonitorIf.class);
|
||||
when(monitor.getKey()).thenReturn((ServiceKey)ServiceKey.forGrpcApi(
|
||||
TestApiGrpc.class,
|
||||
ServicePartition.any()));
|
||||
|
||||
// This test has sleeps that make it a bit
|
||||
// easier to reason about, as it lets us pretend
|
||||
// a distributed concurrent system is not.
|
||||
|
||||
registry1.registerMonitor(monitor);
|
||||
|
||||
Thread.sleep(100);
|
||||
registry2.announceInstance(uuid1);
|
||||
Thread.sleep(100);
|
||||
registry1.announceInstance(uuid2);
|
||||
Thread.sleep(100);
|
||||
registry1.shutDown();
|
||||
|
||||
Thread.sleep(100);
|
||||
|
||||
Mockito.verify(monitor, Mockito.times(3)).onChange();
|
||||
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user