(qs) Parallel execution

This commit is contained in:
Viktor Lofgren 2023-10-23 12:06:03 +02:00
parent efb73ff4e7
commit 731afcb864

View File

@ -8,6 +8,7 @@ import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import nu.marginalia.client.AbstractDynamicClient;
import nu.marginalia.client.Context;
import nu.marginalia.client.exception.RouteNotConfiguredException;
import nu.marginalia.index.client.model.query.SearchSpecification;
import nu.marginalia.index.client.model.results.SearchResultSet;
import nu.marginalia.model.gson.GsonFactory;
@ -56,12 +57,15 @@ public class IndexClient extends AbstractDynamicClient {
@CheckReturnValue
public SearchResultSet query(Context ctx, List<Integer> nodes, SearchSpecification specs) {
return Observable.fromIterable(nodes)
.concatMap(node -> this
.postGet(ctx, node,"/search/", specs, SearchResultSet.class)
.onErrorReturn(t -> new SearchResultSet()),
nodes.size(),
Schedulers.io()
)
.flatMap(node -> {
try {
return this
.postGet(ctx, node, "/search/", specs, SearchResultSet.class).onErrorReturn(t -> new SearchResultSet())
.observeOn(Schedulers.io());
} catch (RouteNotConfiguredException ex) {
return Observable.error(ex);
}
})
.reduce(SearchResultSet::combine)
.blockingGet();
}