(cleanup) Minor cleanups

This commit is contained in:
Viktor Lofgren 2024-02-24 15:33:56 +01:00
parent 1d34224416
commit ff0ef1eebc
3 changed files with 33 additions and 18 deletions

View File

@ -48,20 +48,32 @@ public class GrpcMultiNodeChannelPool<STUB> {
}
private GrpcSingleNodeChannelPool<STUB> getPoolForNode(int node) {
return pools.computeIfAbsent(node, _ ->
new GrpcSingleNodeChannelPool<>(
serviceRegistryIf,
serviceKey.forPartition(ServicePartition.partition(node)),
channelConstructor,
stubConstructor));
return pools.computeIfAbsent(node, this::newSingleChannelPool);
}
private GrpcSingleNodeChannelPool<STUB> newSingleChannelPool(int node) {
return new GrpcSingleNodeChannelPool<>(
serviceRegistryIf,
serviceKey.forPartition(ServicePartition.partition(node)),
channelConstructor,
stubConstructor);
}
/** Get the list of nodes that are eligible for broadcast-style requests */
public List<Integer> getEligibleNodes() {
return nodeConfigurationWatcher.getQueryNodes();
}
/** Create a new call builder for the given method. This is a fluent-style
* method, where you can chain calls to specify how to run the method.
* <p></p>
* Example:
* <code><pre>
* var results = channelPool.call(AStub:someMethod)
* .async(someExecutor)
* .runAll(argumentToSomeMethod);
* </pre></code>
* */
public <T, I> CallBuilderBase<T, I> call(BiFunction<STUB, I, T> method) {
return new CallBuilderBase<>(method);
}
@ -73,16 +85,20 @@ public class GrpcMultiNodeChannelPool<STUB> {
this.method = method;
}
/** Create a call for the given method on the given node */
public GrpcSingleNodeChannelPool<STUB>.CallBuilderBase<T, I> forNode(int node) {
return getPoolForNode(node).call(method);
}
/** Run the given method on each node, returning a list of results.
* This is a blocking method, where each call will be made in sequence */
public List<T> run(I arg) {
return getEligibleNodes().stream()
.map(node -> getPoolForNode(node).call(method).run(arg))
.toList();
}
/** Generate an async call builder for the given method */
public CallBuilderAsync<T, I> async(ExecutorService service) {
return new CallBuilderAsync<>(service, method);
}

View File

@ -169,6 +169,9 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
throw new ServiceNotAvailableException(serviceKey);
}
/** Create a call for the given method on the given node.
* This is a fluent method, so you can chain it with other
* methods to specify the node and arguments */
public <T, I> CallBuilderBase<T, I> call(BiFunction<STUB, I, T> method) {
return new CallBuilderBase<>(method);
}
@ -179,21 +182,12 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
this.method = method;
}
/** Execute the call in a blocking manner */
public T run(I arg) {
return call(method, arg);
}
public List<T> runFor(I... args) {
return runFor(List.of(args));
}
public List<T> runFor(List<I> args) {
List<T> results = new ArrayList<>();
for (var arg : args) {
results.add(call(method, arg));
}
return results;
}
/** Create an asynchronous call using the provided executor */
public CallBuilderAsync<T, I> async(Executor executor) {
return new CallBuilderAsync<>(executor, method);
}
@ -207,9 +201,12 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
this.method = method;
}
/** Execute the call in an asynchronous manner */
public CompletableFuture<T> run(I arg) {
return CompletableFuture.supplyAsync(() -> call(method, arg), executor);
}
/** Execute the call in an asynchronous manner for each of the given arguments */
public CompletableFuture<List<T>> runFor(List<I> args) {
List<CompletableFuture<T>> results = new ArrayList<>();
for (var arg : args) {
@ -218,6 +215,8 @@ public class GrpcSingleNodeChannelPool<STUB> extends ServiceChangeMonitor {
return CompletableFuture.allOf(results.toArray(new CompletableFuture[0]))
.thenApply(v -> results.stream().map(CompletableFuture::join).toList());
}
/** Execute the call in an asynchronous manner for each of the given arguments */
public CompletableFuture<List<T>> runFor(I... args) {
return runFor(List.of(args));
}

View File

@ -79,7 +79,7 @@ public class ZkServiceRegistry implements ServiceRegistryIf {
curatorFramework.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(STR."/first-boot");
.forPath("/first-boot");
}
}