(client) Refactor GrpcStubPool to handle error states
Refactored the GRPC Stub Pool for better handling of channel SHUTDOWN state. Any disconnected channels are now re-created before returning the stub. The class was also renamed to GrpcChannelPool, as we no longer pool the stubs.
This commit is contained in:
parent
37a7296759
commit
92717a4832
@ -4,7 +4,7 @@ import com.google.inject.Inject;
|
|||||||
import com.google.inject.Singleton;
|
import com.google.inject.Singleton;
|
||||||
import nu.marginalia.client.AbstractDynamicClient;
|
import nu.marginalia.client.AbstractDynamicClient;
|
||||||
import nu.marginalia.client.Context;
|
import nu.marginalia.client.Context;
|
||||||
import nu.marginalia.client.grpc.GrpcStubPool;
|
import nu.marginalia.client.grpc.GrpcChannelPool;
|
||||||
import nu.marginalia.executor.api.*;
|
import nu.marginalia.executor.api.*;
|
||||||
import nu.marginalia.executor.api.ExecutorApiGrpc.ExecutorApiBlockingStub;
|
import nu.marginalia.executor.api.ExecutorApiGrpc.ExecutorApiBlockingStub;
|
||||||
import nu.marginalia.executor.model.ActorRunState;
|
import nu.marginalia.executor.model.ActorRunState;
|
||||||
@ -35,14 +35,14 @@ import java.util.concurrent.TimeUnit;
|
|||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public class ExecutorClient extends AbstractDynamicClient {
|
public class ExecutorClient extends AbstractDynamicClient {
|
||||||
private final GrpcStubPool<ExecutorApiBlockingStub> stubPool;
|
private final GrpcChannelPool<ExecutorApiBlockingStub> channelPool;
|
||||||
private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class);
|
private static final Logger logger = LoggerFactory.getLogger(ExecutorClient.class);
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public ExecutorClient(ServiceDescriptors descriptors, NodeConfigurationService nodeConfigurationService) {
|
public ExecutorClient(ServiceDescriptors descriptors, NodeConfigurationService nodeConfigurationService) {
|
||||||
super(descriptors.forId(ServiceId.Executor), GsonFactory::get);
|
super(descriptors.forId(ServiceId.Executor), GsonFactory::get);
|
||||||
|
|
||||||
stubPool = new GrpcStubPool<>(ServiceId.Executor) {
|
channelPool = new GrpcChannelPool<>(ServiceId.Executor) {
|
||||||
@Override
|
@Override
|
||||||
public ExecutorApiBlockingStub createStub(ManagedChannel channel) {
|
public ExecutorApiBlockingStub createStub(ManagedChannel channel) {
|
||||||
return ExecutorApiGrpc.newBlockingStub(channel);
|
return ExecutorApiGrpc.newBlockingStub(channel);
|
||||||
@ -59,7 +59,7 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void startFsm(int node, String actorName) {
|
public void startFsm(int node, String actorName) {
|
||||||
stubPool.apiForNode(node).startFsm(
|
channelPool.apiForNode(node).startFsm(
|
||||||
RpcFsmName.newBuilder()
|
RpcFsmName.newBuilder()
|
||||||
.setActorName(actorName)
|
.setActorName(actorName)
|
||||||
.build()
|
.build()
|
||||||
@ -67,7 +67,7 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void stopFsm(int node, String actorName) {
|
public void stopFsm(int node, String actorName) {
|
||||||
stubPool.apiForNode(node).stopFsm(
|
channelPool.apiForNode(node).stopFsm(
|
||||||
RpcFsmName.newBuilder()
|
RpcFsmName.newBuilder()
|
||||||
.setActorName(actorName)
|
.setActorName(actorName)
|
||||||
.build()
|
.build()
|
||||||
@ -75,7 +75,7 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void stopProcess(int node, String id) {
|
public void stopProcess(int node, String id) {
|
||||||
stubPool.apiForNode(node).stopProcess(
|
channelPool.apiForNode(node).stopProcess(
|
||||||
RpcProcessId.newBuilder()
|
RpcProcessId.newBuilder()
|
||||||
.setProcessId(id)
|
.setProcessId(id)
|
||||||
.build()
|
.build()
|
||||||
@ -83,7 +83,7 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void triggerCrawl(int node, FileStorageId fid) {
|
public void triggerCrawl(int node, FileStorageId fid) {
|
||||||
stubPool.apiForNode(node).triggerCrawl(
|
channelPool.apiForNode(node).triggerCrawl(
|
||||||
RpcFileStorageId.newBuilder()
|
RpcFileStorageId.newBuilder()
|
||||||
.setFileStorageId(fid.id())
|
.setFileStorageId(fid.id())
|
||||||
.build()
|
.build()
|
||||||
@ -91,7 +91,7 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void triggerRecrawl(int node, FileStorageId fid) {
|
public void triggerRecrawl(int node, FileStorageId fid) {
|
||||||
stubPool.apiForNode(node).triggerRecrawl(
|
channelPool.apiForNode(node).triggerRecrawl(
|
||||||
RpcFileStorageId.newBuilder()
|
RpcFileStorageId.newBuilder()
|
||||||
.setFileStorageId(fid.id())
|
.setFileStorageId(fid.id())
|
||||||
.build()
|
.build()
|
||||||
@ -99,7 +99,7 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void triggerConvert(int node, FileStorageId fid) {
|
public void triggerConvert(int node, FileStorageId fid) {
|
||||||
stubPool.apiForNode(node).triggerConvert(
|
channelPool.apiForNode(node).triggerConvert(
|
||||||
RpcFileStorageId.newBuilder()
|
RpcFileStorageId.newBuilder()
|
||||||
.setFileStorageId(fid.id())
|
.setFileStorageId(fid.id())
|
||||||
.build()
|
.build()
|
||||||
@ -107,7 +107,7 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void triggerConvertAndLoad(int node, FileStorageId fid) {
|
public void triggerConvertAndLoad(int node, FileStorageId fid) {
|
||||||
stubPool.apiForNode(node).triggerConvertAndLoad(
|
channelPool.apiForNode(node).triggerConvertAndLoad(
|
||||||
RpcFileStorageId.newBuilder()
|
RpcFileStorageId.newBuilder()
|
||||||
.setFileStorageId(fid.id())
|
.setFileStorageId(fid.id())
|
||||||
.build()
|
.build()
|
||||||
@ -115,7 +115,7 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void loadProcessedData(int node, List<FileStorageId> ids) {
|
public void loadProcessedData(int node, List<FileStorageId> ids) {
|
||||||
stubPool.apiForNode(node).loadProcessedData(
|
channelPool.apiForNode(node).loadProcessedData(
|
||||||
RpcFileStorageIds.newBuilder()
|
RpcFileStorageIds.newBuilder()
|
||||||
.addAllFileStorageIds(ids.stream().map(FileStorageId::id).toList())
|
.addAllFileStorageIds(ids.stream().map(FileStorageId::id).toList())
|
||||||
.build()
|
.build()
|
||||||
@ -123,11 +123,11 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void calculateAdjacencies(int node) {
|
public void calculateAdjacencies(int node) {
|
||||||
stubPool.apiForNode(node).calculateAdjacencies(Empty.getDefaultInstance());
|
channelPool.apiForNode(node).calculateAdjacencies(Empty.getDefaultInstance());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void sideloadEncyclopedia(int node, Path sourcePath, String baseUrl) {
|
public void sideloadEncyclopedia(int node, Path sourcePath, String baseUrl) {
|
||||||
stubPool.apiForNode(node).sideloadEncyclopedia(
|
channelPool.apiForNode(node).sideloadEncyclopedia(
|
||||||
RpcSideloadEncyclopedia.newBuilder()
|
RpcSideloadEncyclopedia.newBuilder()
|
||||||
.setBaseUrl(baseUrl)
|
.setBaseUrl(baseUrl)
|
||||||
.setSourcePath(sourcePath.toString())
|
.setSourcePath(sourcePath.toString())
|
||||||
@ -136,21 +136,21 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void sideloadDirtree(int node, Path sourcePath) {
|
public void sideloadDirtree(int node, Path sourcePath) {
|
||||||
stubPool.apiForNode(node).sideloadDirtree(
|
channelPool.apiForNode(node).sideloadDirtree(
|
||||||
RpcSideloadDirtree.newBuilder()
|
RpcSideloadDirtree.newBuilder()
|
||||||
.setSourcePath(sourcePath.toString())
|
.setSourcePath(sourcePath.toString())
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
public void sideloadReddit(int node, Path sourcePath) {
|
public void sideloadReddit(int node, Path sourcePath) {
|
||||||
stubPool.apiForNode(node).sideloadReddit(
|
channelPool.apiForNode(node).sideloadReddit(
|
||||||
RpcSideloadReddit.newBuilder()
|
RpcSideloadReddit.newBuilder()
|
||||||
.setSourcePath(sourcePath.toString())
|
.setSourcePath(sourcePath.toString())
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
public void sideloadWarc(int node, Path sourcePath) {
|
public void sideloadWarc(int node, Path sourcePath) {
|
||||||
stubPool.apiForNode(node).sideloadWarc(
|
channelPool.apiForNode(node).sideloadWarc(
|
||||||
RpcSideloadWarc.newBuilder()
|
RpcSideloadWarc.newBuilder()
|
||||||
.setSourcePath(sourcePath.toString())
|
.setSourcePath(sourcePath.toString())
|
||||||
.build()
|
.build()
|
||||||
@ -158,7 +158,7 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void sideloadStackexchange(int node, Path sourcePath) {
|
public void sideloadStackexchange(int node, Path sourcePath) {
|
||||||
stubPool.apiForNode(node).sideloadStackexchange(
|
channelPool.apiForNode(node).sideloadStackexchange(
|
||||||
RpcSideloadStackexchange.newBuilder()
|
RpcSideloadStackexchange.newBuilder()
|
||||||
.setSourcePath(sourcePath.toString())
|
.setSourcePath(sourcePath.toString())
|
||||||
.build()
|
.build()
|
||||||
@ -166,7 +166,7 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void createCrawlSpecFromDownload(int node, String description, String url) {
|
public void createCrawlSpecFromDownload(int node, String description, String url) {
|
||||||
stubPool.apiForNode(node).createCrawlSpecFromDownload(
|
channelPool.apiForNode(node).createCrawlSpecFromDownload(
|
||||||
RpcCrawlSpecFromDownload.newBuilder()
|
RpcCrawlSpecFromDownload.newBuilder()
|
||||||
.setDescription(description)
|
.setDescription(description)
|
||||||
.setUrl(url)
|
.setUrl(url)
|
||||||
@ -175,14 +175,14 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void exportAtags(int node, FileStorageId fid) {
|
public void exportAtags(int node, FileStorageId fid) {
|
||||||
stubPool.apiForNode(node).exportAtags(
|
channelPool.apiForNode(node).exportAtags(
|
||||||
RpcFileStorageId.newBuilder()
|
RpcFileStorageId.newBuilder()
|
||||||
.setFileStorageId(fid.id())
|
.setFileStorageId(fid.id())
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
public void exportSampleData(int node, FileStorageId fid, int size, String name) {
|
public void exportSampleData(int node, FileStorageId fid, int size, String name) {
|
||||||
stubPool.apiForNode(node).exportSampleData(
|
channelPool.apiForNode(node).exportSampleData(
|
||||||
RpcExportSampleData.newBuilder()
|
RpcExportSampleData.newBuilder()
|
||||||
.setFileStorageId(fid.id())
|
.setFileStorageId(fid.id())
|
||||||
.setSize(size)
|
.setSize(size)
|
||||||
@ -192,14 +192,14 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void exportRssFeeds(int node, FileStorageId fid) {
|
public void exportRssFeeds(int node, FileStorageId fid) {
|
||||||
stubPool.apiForNode(node).exportRssFeeds(
|
channelPool.apiForNode(node).exportRssFeeds(
|
||||||
RpcFileStorageId.newBuilder()
|
RpcFileStorageId.newBuilder()
|
||||||
.setFileStorageId(fid.id())
|
.setFileStorageId(fid.id())
|
||||||
.build()
|
.build()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
public void exportTermFrequencies(int node, FileStorageId fid) {
|
public void exportTermFrequencies(int node, FileStorageId fid) {
|
||||||
stubPool.apiForNode(node).exportTermFrequencies(
|
channelPool.apiForNode(node).exportTermFrequencies(
|
||||||
RpcFileStorageId.newBuilder()
|
RpcFileStorageId.newBuilder()
|
||||||
.setFileStorageId(fid.id())
|
.setFileStorageId(fid.id())
|
||||||
.build()
|
.build()
|
||||||
@ -207,7 +207,7 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void downloadSampleData(int node, String sampleSet) {
|
public void downloadSampleData(int node, String sampleSet) {
|
||||||
stubPool.apiForNode(node).downloadSampleData(
|
channelPool.apiForNode(node).downloadSampleData(
|
||||||
RpcDownloadSampleData.newBuilder()
|
RpcDownloadSampleData.newBuilder()
|
||||||
.setSampleSet(sampleSet)
|
.setSampleSet(sampleSet)
|
||||||
.build()
|
.build()
|
||||||
@ -215,11 +215,11 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public void exportData(int node) {
|
public void exportData(int node) {
|
||||||
stubPool.apiForNode(node).exportData(Empty.getDefaultInstance());
|
channelPool.apiForNode(node).exportData(Empty.getDefaultInstance());
|
||||||
}
|
}
|
||||||
|
|
||||||
public void restoreBackup(int node, FileStorageId fid) {
|
public void restoreBackup(int node, FileStorageId fid) {
|
||||||
stubPool.apiForNode(node).restoreBackup(
|
channelPool.apiForNode(node).restoreBackup(
|
||||||
RpcFileStorageId.newBuilder()
|
RpcFileStorageId.newBuilder()
|
||||||
.setFileStorageId(fid.id())
|
.setFileStorageId(fid.id())
|
||||||
.build()
|
.build()
|
||||||
@ -228,7 +228,7 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
|
|
||||||
public ActorRunStates getActorStates(int node) {
|
public ActorRunStates getActorStates(int node) {
|
||||||
try {
|
try {
|
||||||
var rs = stubPool.apiForNode(node).getActorStates(Empty.getDefaultInstance());
|
var rs = channelPool.apiForNode(node).getActorStates(Empty.getDefaultInstance());
|
||||||
var states = rs.getActorRunStatesList().stream()
|
var states = rs.getActorRunStatesList().stream()
|
||||||
.map(r -> new ActorRunState(
|
.map(r -> new ActorRunState(
|
||||||
r.getActorName(),
|
r.getActorName(),
|
||||||
@ -252,7 +252,7 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
|
|
||||||
public UploadDirContents listSideloadDir(int node) {
|
public UploadDirContents listSideloadDir(int node) {
|
||||||
try {
|
try {
|
||||||
var rs = stubPool.apiForNode(node).listSideloadDir(Empty.getDefaultInstance());
|
var rs = channelPool.apiForNode(node).listSideloadDir(Empty.getDefaultInstance());
|
||||||
var items = rs.getEntriesList().stream()
|
var items = rs.getEntriesList().stream()
|
||||||
.map(i -> new UploadDirItem(i.getName(), i.getLastModifiedTime(), i.getIsDirectory(), i.getSize()))
|
.map(i -> new UploadDirItem(i.getName(), i.getLastModifiedTime(), i.getIsDirectory(), i.getSize()))
|
||||||
.toList();
|
.toList();
|
||||||
@ -268,7 +268,7 @@ public class ExecutorClient extends AbstractDynamicClient {
|
|||||||
|
|
||||||
public FileStorageContent listFileStorage(int node, FileStorageId fileId) {
|
public FileStorageContent listFileStorage(int node, FileStorageId fileId) {
|
||||||
try {
|
try {
|
||||||
var rs = stubPool.apiForNode(node).listFileStorage(
|
var rs = channelPool.apiForNode(node).listFileStorage(
|
||||||
RpcFileStorageId.newBuilder()
|
RpcFileStorageId.newBuilder()
|
||||||
.setFileStorageId(fileId.id())
|
.setFileStorageId(fileId.id())
|
||||||
.build()
|
.build()
|
||||||
|
@ -10,11 +10,12 @@ import java.util.concurrent.*;
|
|||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static io.grpc.ConnectivityState.SHUTDOWN;
|
||||||
|
|
||||||
/** A pool of gRPC stubs for a service, with a separate stub for each node.
|
/** A pool of gRPC stubs for a service, with a separate stub for each node.
|
||||||
* Manages broadcast-style request. */
|
* Manages broadcast-style request. */
|
||||||
public abstract class GrpcStubPool<STUB> {
|
public abstract class GrpcChannelPool<STUB> {
|
||||||
public GrpcStubPool(String serviceName) {
|
public GrpcChannelPool(String serviceName) {
|
||||||
|
|
||||||
this.serviceName = serviceName;
|
this.serviceName = serviceName;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -25,25 +26,34 @@ public abstract class GrpcStubPool<STUB> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private final Map<ServiceAndNode, ManagedChannel> channels = new ConcurrentHashMap<>();
|
private final Map<ServiceAndNode, ManagedChannel> channels = new ConcurrentHashMap<>();
|
||||||
private final Map<ServiceAndNode, STUB> apis = new ConcurrentHashMap<>();
|
|
||||||
private final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor();
|
private final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor();
|
||||||
|
|
||||||
private final String serviceName;
|
private final String serviceName;
|
||||||
|
|
||||||
public GrpcStubPool(ServiceId serviceId) {
|
public GrpcChannelPool(ServiceId serviceId) {
|
||||||
this.serviceName = serviceId.serviceName;
|
this.serviceName = serviceId.serviceName;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Get an API stub for the given node */
|
/** Get an API stub for the given node */
|
||||||
public STUB apiForNode(int node) {
|
public STUB apiForNode(int node) {
|
||||||
var san = new ServiceAndNode(serviceName, node);
|
return createStub(
|
||||||
return apis.computeIfAbsent(san, n ->
|
channels.compute(
|
||||||
createStub(channels.computeIfAbsent(san, this::createChannel))
|
new ServiceAndNode(serviceName, node),
|
||||||
|
this::refreshChannel)
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private ManagedChannel refreshChannel(ServiceAndNode serviceAndNode, ManagedChannel old) {
|
||||||
|
if (old == null || old.getState(true) != SHUTDOWN) {
|
||||||
|
return createChannel(serviceAndNode);
|
||||||
|
}
|
||||||
|
return old;
|
||||||
|
}
|
||||||
|
|
||||||
protected ManagedChannel createChannel(ServiceAndNode serviceAndNode) {
|
protected ManagedChannel createChannel(ServiceAndNode serviceAndNode) {
|
||||||
return ManagedChannelBuilder.forAddress(serviceAndNode.getHostName(), 81).usePlaintext().build();
|
return ManagedChannelBuilder.forAddress(serviceAndNode.getHostName(), 81)
|
||||||
|
.usePlaintext()
|
||||||
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Invoke a function on each node, returning a list of futures in a terminal state, as per
|
/** Invoke a function on each node, returning a list of futures in a terminal state, as per
|
@ -3,7 +3,7 @@ package nu.marginalia.query;
|
|||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import io.grpc.ManagedChannel;
|
import io.grpc.ManagedChannel;
|
||||||
import io.grpc.stub.StreamObserver;
|
import io.grpc.stub.StreamObserver;
|
||||||
import nu.marginalia.client.grpc.GrpcStubPool;
|
import nu.marginalia.client.grpc.GrpcChannelPool;
|
||||||
import nu.marginalia.index.api.IndexDomainLinksApiGrpc;
|
import nu.marginalia.index.api.IndexDomainLinksApiGrpc;
|
||||||
import nu.marginalia.index.api.RpcDomainIdCount;
|
import nu.marginalia.index.api.RpcDomainIdCount;
|
||||||
import nu.marginalia.index.api.RpcDomainIdList;
|
import nu.marginalia.index.api.RpcDomainIdList;
|
||||||
@ -17,11 +17,11 @@ import java.util.List;
|
|||||||
|
|
||||||
public class QueryGRPCDomainLinksService extends IndexDomainLinksApiGrpc.IndexDomainLinksApiImplBase {
|
public class QueryGRPCDomainLinksService extends IndexDomainLinksApiGrpc.IndexDomainLinksApiImplBase {
|
||||||
private static final Logger logger = LoggerFactory.getLogger(QueryGRPCDomainLinksService.class);
|
private static final Logger logger = LoggerFactory.getLogger(QueryGRPCDomainLinksService.class);
|
||||||
private final GrpcStubPool<IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub> stubPool;
|
private final GrpcChannelPool<IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub> channelPool;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public QueryGRPCDomainLinksService(NodeConfigurationWatcher nodeConfigurationWatcher) {
|
public QueryGRPCDomainLinksService(NodeConfigurationWatcher nodeConfigurationWatcher) {
|
||||||
stubPool = new GrpcStubPool<>(ServiceId.Index) {
|
channelPool = new GrpcChannelPool<>(ServiceId.Index) {
|
||||||
@Override
|
@Override
|
||||||
public IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub createStub(ManagedChannel channel) {
|
public IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub createStub(ManagedChannel channel) {
|
||||||
return IndexDomainLinksApiGrpc.newBlockingStub(channel);
|
return IndexDomainLinksApiGrpc.newBlockingStub(channel);
|
||||||
@ -37,7 +37,7 @@ public class QueryGRPCDomainLinksService extends IndexDomainLinksApiGrpc.IndexDo
|
|||||||
@Override
|
@Override
|
||||||
public void getAllLinks(nu.marginalia.index.api.Empty request,
|
public void getAllLinks(nu.marginalia.index.api.Empty request,
|
||||||
StreamObserver<RpcDomainIdPairs> responseObserver) {
|
StreamObserver<RpcDomainIdPairs> responseObserver) {
|
||||||
stubPool.callEachSequential(stub -> stub.getAllLinks(request))
|
channelPool.callEachSequential(stub -> stub.getAllLinks(request))
|
||||||
.forEach(
|
.forEach(
|
||||||
iter -> iter.forEachRemaining(responseObserver::onNext)
|
iter -> iter.forEachRemaining(responseObserver::onNext)
|
||||||
);
|
);
|
||||||
@ -50,7 +50,7 @@ public class QueryGRPCDomainLinksService extends IndexDomainLinksApiGrpc.IndexDo
|
|||||||
StreamObserver<RpcDomainIdList> responseObserver) {
|
StreamObserver<RpcDomainIdList> responseObserver) {
|
||||||
var rspBuilder = RpcDomainIdList.newBuilder();
|
var rspBuilder = RpcDomainIdList.newBuilder();
|
||||||
|
|
||||||
stubPool.callEachSequential(stub -> stub.getLinksFromDomain(request))
|
channelPool.callEachSequential(stub -> stub.getLinksFromDomain(request))
|
||||||
.map(RpcDomainIdList::getDomainIdList)
|
.map(RpcDomainIdList::getDomainIdList)
|
||||||
.forEach(rspBuilder::addAllDomainId);
|
.forEach(rspBuilder::addAllDomainId);
|
||||||
|
|
||||||
@ -63,7 +63,7 @@ public class QueryGRPCDomainLinksService extends IndexDomainLinksApiGrpc.IndexDo
|
|||||||
StreamObserver<RpcDomainIdList> responseObserver) {
|
StreamObserver<RpcDomainIdList> responseObserver) {
|
||||||
var rspBuilder = RpcDomainIdList.newBuilder();
|
var rspBuilder = RpcDomainIdList.newBuilder();
|
||||||
|
|
||||||
stubPool.callEachSequential(stub -> stub.getLinksToDomain(request))
|
channelPool.callEachSequential(stub -> stub.getLinksToDomain(request))
|
||||||
.map(RpcDomainIdList::getDomainIdList)
|
.map(RpcDomainIdList::getDomainIdList)
|
||||||
.forEach(rspBuilder::addAllDomainId);
|
.forEach(rspBuilder::addAllDomainId);
|
||||||
|
|
||||||
@ -75,7 +75,7 @@ public class QueryGRPCDomainLinksService extends IndexDomainLinksApiGrpc.IndexDo
|
|||||||
public void countLinksFromDomain(nu.marginalia.index.api.RpcDomainId request,
|
public void countLinksFromDomain(nu.marginalia.index.api.RpcDomainId request,
|
||||||
StreamObserver<RpcDomainIdCount> responseObserver) {
|
StreamObserver<RpcDomainIdCount> responseObserver) {
|
||||||
|
|
||||||
int sum = stubPool.callEachSequential(stub -> stub.countLinksFromDomain(request))
|
int sum = channelPool.callEachSequential(stub -> stub.countLinksFromDomain(request))
|
||||||
.mapToInt(RpcDomainIdCount::getIdCount)
|
.mapToInt(RpcDomainIdCount::getIdCount)
|
||||||
.sum();
|
.sum();
|
||||||
|
|
||||||
@ -89,7 +89,7 @@ public class QueryGRPCDomainLinksService extends IndexDomainLinksApiGrpc.IndexDo
|
|||||||
public void countLinksToDomain(nu.marginalia.index.api.RpcDomainId request,
|
public void countLinksToDomain(nu.marginalia.index.api.RpcDomainId request,
|
||||||
io.grpc.stub.StreamObserver<nu.marginalia.index.api.RpcDomainIdCount> responseObserver) {
|
io.grpc.stub.StreamObserver<nu.marginalia.index.api.RpcDomainIdCount> responseObserver) {
|
||||||
|
|
||||||
int sum = stubPool.callEachSequential(stub -> stub.countLinksToDomain(request))
|
int sum = channelPool.callEachSequential(stub -> stub.countLinksToDomain(request))
|
||||||
.mapToInt(RpcDomainIdCount::getIdCount)
|
.mapToInt(RpcDomainIdCount::getIdCount)
|
||||||
.sum();
|
.sum();
|
||||||
|
|
||||||
|
@ -4,7 +4,7 @@ import com.google.inject.Inject;
|
|||||||
import io.grpc.ManagedChannel;
|
import io.grpc.ManagedChannel;
|
||||||
import io.prometheus.client.Histogram;
|
import io.prometheus.client.Histogram;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import nu.marginalia.client.grpc.GrpcStubPool;
|
import nu.marginalia.client.grpc.GrpcChannelPool;
|
||||||
import nu.marginalia.db.DomainBlacklist;
|
import nu.marginalia.db.DomainBlacklist;
|
||||||
import nu.marginalia.index.api.*;
|
import nu.marginalia.index.api.*;
|
||||||
import nu.marginalia.model.id.UrlIdCodec;
|
import nu.marginalia.model.id.UrlIdCodec;
|
||||||
@ -27,7 +27,7 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
|
|||||||
.help("QS-side query time (GRPC endpoint)")
|
.help("QS-side query time (GRPC endpoint)")
|
||||||
.register();
|
.register();
|
||||||
|
|
||||||
private final GrpcStubPool<IndexApiGrpc.IndexApiBlockingStub> stubPool;
|
private final GrpcChannelPool<IndexApiGrpc.IndexApiBlockingStub> channelPool;
|
||||||
|
|
||||||
private final QueryFactory queryFactory;
|
private final QueryFactory queryFactory;
|
||||||
private final DomainBlacklist blacklist;
|
private final DomainBlacklist blacklist;
|
||||||
@ -40,7 +40,7 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
|
|||||||
this.queryFactory = queryFactory;
|
this.queryFactory = queryFactory;
|
||||||
this.blacklist = blacklist;
|
this.blacklist = blacklist;
|
||||||
|
|
||||||
stubPool = new GrpcStubPool<>(ServiceId.Index) {
|
channelPool = new GrpcChannelPool<>(ServiceId.Index) {
|
||||||
@Override
|
@Override
|
||||||
public IndexApiGrpc.IndexApiBlockingStub createStub(ManagedChannel channel) {
|
public IndexApiGrpc.IndexApiBlockingStub createStub(ManagedChannel channel) {
|
||||||
return IndexApiGrpc.newBlockingStub(channel);
|
return IndexApiGrpc.newBlockingStub(channel);
|
||||||
@ -89,7 +89,7 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
|
|||||||
|
|
||||||
@SneakyThrows
|
@SneakyThrows
|
||||||
private List<RpcDecoratedResultItem> executeQueries(RpcIndexQuery indexRequest, int totalSize) {
|
private List<RpcDecoratedResultItem> executeQueries(RpcIndexQuery indexRequest, int totalSize) {
|
||||||
return stubPool.invokeAll(stub -> new QueryTask(stub, indexRequest))
|
return channelPool.invokeAll(stub -> new QueryTask(stub, indexRequest))
|
||||||
.stream()
|
.stream()
|
||||||
.filter(f -> f.state() == Future.State.SUCCESS)
|
.filter(f -> f.state() == Future.State.SUCCESS)
|
||||||
.map(Future::resultNow)
|
.map(Future::resultNow)
|
||||||
|
Loading…
Reference in New Issue
Block a user