(prometheus) Add instrumentation to the search, qs and index services.
This commit is contained in:
parent
116595d218
commit
31232e49fb
5 changed files with 125 additions and 51 deletions
|
@ -1,6 +1,8 @@
|
|||
package nu.marginalia.search;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import io.prometheus.client.Counter;
|
||||
import io.prometheus.client.Histogram;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.WebsiteUrl;
|
||||
import nu.marginalia.client.Context;
|
||||
|
@ -11,6 +13,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
import spark.Request;
|
||||
import spark.Response;
|
||||
import spark.Route;
|
||||
import spark.Spark;
|
||||
|
||||
import java.net.URLEncoder;
|
||||
|
@ -22,6 +25,17 @@ public class SearchService extends Service {
|
|||
private final StaticResources staticResources;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(SearchService.class);
|
||||
private static final Histogram wmsa_search_service_request_time = Histogram.build()
|
||||
.name("wmsa_search_service_request_time")
|
||||
.linearBuckets(0.005, 0.005, 15)
|
||||
.labelNames("matchedPath", "method")
|
||||
.help("Search service request time (seconds)")
|
||||
.register();
|
||||
private static final Counter wmsa_search_service_error_count = Counter.build()
|
||||
.name("wmsa_search_service_error_count")
|
||||
.labelNames("matchedPath", "method")
|
||||
.help("Search service error count")
|
||||
.register();
|
||||
|
||||
@SneakyThrows
|
||||
@Inject
|
||||
|
@ -42,30 +56,56 @@ public class SearchService extends Service {
|
|||
|
||||
Spark.staticFiles.expireTime(600);
|
||||
|
||||
Spark.get("/search", searchQueryService::pathSearch);
|
||||
SearchServiceMetrics.get("/search", searchQueryService::pathSearch);
|
||||
SearchServiceMetrics.get("/public/search", searchQueryService::pathSearch);
|
||||
|
||||
Spark.get("/public/search", searchQueryService::pathSearch);
|
||||
Spark.get("/public/", frontPageService::render);
|
||||
Spark.get("/public/news.xml", frontPageService::renderNewsFeed);
|
||||
Spark.get("/public/:resource", this::serveStatic);
|
||||
SearchServiceMetrics.get("/public/", frontPageService::render);
|
||||
SearchServiceMetrics.get("/public/news.xml", frontPageService::renderNewsFeed);
|
||||
SearchServiceMetrics.get("/public/:resource", this::serveStatic);
|
||||
|
||||
Spark.post("/public/site/suggest/", addToCrawlQueueService::suggestCrawling);
|
||||
SearchServiceMetrics.post("/public/site/suggest/", addToCrawlQueueService::suggestCrawling);
|
||||
|
||||
Spark.get("/public/site-search/:site/*", this::siteSearchRedir);
|
||||
SearchServiceMetrics.get("/public/site-search/:site/*", this::siteSearchRedir);
|
||||
|
||||
Spark.get("/public/site/:site", siteInfoService::handle);
|
||||
Spark.post("/public/site/:site", siteInfoService::handlePost);
|
||||
SearchServiceMetrics.get("/public/site/:site", siteInfoService::handle);
|
||||
SearchServiceMetrics.post("/public/site/:site", siteInfoService::handlePost);
|
||||
|
||||
Spark.get("/public/crosstalk/", crosstalkService::handle);
|
||||
SearchServiceMetrics.get("/public/crosstalk/", crosstalkService::handle);
|
||||
|
||||
Spark.exception(Exception.class, (e,p,q) -> {
|
||||
logger.error("Error during processing", e);
|
||||
wmsa_search_service_error_count.labels(p.pathInfo(), p.requestMethod()).inc();
|
||||
errorPageService.serveError(Context.fromRequest(p), p, q);
|
||||
});
|
||||
|
||||
Spark.awaitInitialization();
|
||||
}
|
||||
|
||||
|
||||
|
||||
/** Wraps a route with a timer and a counter */
|
||||
private static class SearchServiceMetrics implements Route {
|
||||
private final Route delegatedRoute;
|
||||
|
||||
static void get(String path, Route route) {
|
||||
Spark.get(path, new SearchServiceMetrics(route));
|
||||
}
|
||||
static void post(String path, Route route) {
|
||||
Spark.post(path, new SearchServiceMetrics(route));
|
||||
}
|
||||
|
||||
private SearchServiceMetrics(Route delegatedRoute) {
|
||||
this.delegatedRoute = delegatedRoute;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object handle(Request request, Response response) throws Exception {
|
||||
return wmsa_search_service_request_time
|
||||
.labels(request.pathInfo(), request.requestMethod())
|
||||
.time(() -> delegatedRoute.handle(request, response));
|
||||
}
|
||||
}
|
||||
|
||||
private Object serveStatic(Request request, Response response) {
|
||||
String resource = request.params("resource");
|
||||
staticResources.serveStatic("search", resource, request, response);
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package nu.marginalia.search.svc;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import io.prometheus.client.Histogram;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.WebsiteUrl;
|
||||
import nu.marginalia.search.command.SearchAdtechParameter;
|
||||
|
|
|
@ -55,19 +55,19 @@ public class IndexQueryService extends IndexApiImplBase {
|
|||
private final Marker queryMarker = MarkerFactory.getMarker("QUERY");
|
||||
|
||||
private static final Counter wmsa_query_timeouts = Counter.build()
|
||||
.name("wmsa_query_timeouts")
|
||||
.name("wmsa_index_query_timeouts")
|
||||
.help("Query timeout counter")
|
||||
.labelNames("node")
|
||||
.labelNames("node", "api")
|
||||
.register();
|
||||
private static final Gauge wmsa_query_cost = Gauge.build()
|
||||
.name("wmsa_query_cost")
|
||||
.name("wmsa_index_query_cost")
|
||||
.help("Computational cost of query")
|
||||
.labelNames("node")
|
||||
.labelNames("node", "api")
|
||||
.register();
|
||||
private static final Histogram wmsa_query_time = Histogram.build()
|
||||
.name("wmsa_query_time")
|
||||
.linearBuckets(50., 50., 15)
|
||||
.labelNames("node")
|
||||
.name("wmsa_index_query_time")
|
||||
.linearBuckets(0.005, 0.005, 15)
|
||||
.labelNames("node", "api")
|
||||
.help("Index-side query time")
|
||||
.register();
|
||||
|
||||
|
@ -138,7 +138,7 @@ public class IndexQueryService extends IndexApiImplBase {
|
|||
|
||||
try {
|
||||
return wmsa_query_time
|
||||
.labels(nodeName)
|
||||
.labels(nodeName, "REST")
|
||||
.time(() -> {
|
||||
var params = new SearchParameters(specsSet, getSearchSet(specsSet));
|
||||
|
||||
|
@ -147,11 +147,12 @@ public class IndexQueryService extends IndexApiImplBase {
|
|||
logger.info(queryMarker, "Index Result Count: {}", results.size());
|
||||
|
||||
wmsa_query_cost
|
||||
.labels(nodeName)
|
||||
.labels(nodeName, "REST")
|
||||
.set(params.getDataCost());
|
||||
|
||||
if (!params.hasTimeLeft()) {
|
||||
wmsa_query_timeouts
|
||||
.labels(nodeName)
|
||||
.labels(nodeName, "REST")
|
||||
.inc();
|
||||
}
|
||||
|
||||
|
@ -179,7 +180,22 @@ public class IndexQueryService extends IndexApiImplBase {
|
|||
try {
|
||||
var params = new SearchParameters(request, getSearchSet(request));
|
||||
|
||||
SearchResultSet results = executeSearch(params);
|
||||
final String nodeName = Integer.toString(nodeId);
|
||||
|
||||
SearchResultSet results = wmsa_query_time
|
||||
.labels(nodeName, "GRPC")
|
||||
.time(() -> executeSearch(params));
|
||||
|
||||
wmsa_query_cost
|
||||
.labels(nodeName, "GRPC")
|
||||
.set(params.getDataCost());
|
||||
|
||||
if (!params.hasTimeLeft()) {
|
||||
wmsa_query_timeouts
|
||||
.labels(nodeName, "GRPC")
|
||||
.inc();
|
||||
}
|
||||
|
||||
for (var result : results.results) {
|
||||
|
||||
var rawResult = result.rawIndexResult;
|
||||
|
|
|
@ -3,6 +3,8 @@ package nu.marginalia.query;
|
|||
import com.google.inject.Inject;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.prometheus.client.Histogram;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.db.DomainBlacklist;
|
||||
import nu.marginalia.index.api.*;
|
||||
import nu.marginalia.model.id.UrlIdCodec;
|
||||
|
@ -19,6 +21,12 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
|
|||
|
||||
private final Logger logger = LoggerFactory.getLogger(QueryGRPCService.class);
|
||||
|
||||
private static final Histogram wmsa_qs_query_time_grpc = Histogram.build()
|
||||
.name("wmsa_qs_query_time_grpc")
|
||||
.linearBuckets(0.005, 0.005, 15)
|
||||
.help("QS-side query time (GRPC endpoint)")
|
||||
.register();
|
||||
|
||||
private final Map<ServiceAndNode, ManagedChannel> channels
|
||||
= new ConcurrentHashMap<>();
|
||||
private final Map<ServiceAndNode, IndexApiGrpc.IndexApiBlockingStub> actorRpcApis
|
||||
|
@ -61,23 +69,25 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
|
|||
io.grpc.stub.StreamObserver<nu.marginalia.index.api.RpcQsResponse> responseObserver)
|
||||
{
|
||||
try {
|
||||
var params = QueryProtobufCodec.convertRequest(request);
|
||||
var query = queryFactory.createQuery(params);
|
||||
wmsa_qs_query_time_grpc.time(() -> {
|
||||
var params = QueryProtobufCodec.convertRequest(request);
|
||||
var query = queryFactory.createQuery(params);
|
||||
|
||||
RpcIndexQuery indexRequest = QueryProtobufCodec.convertQuery(request, query);
|
||||
List<RpcDecoratedResultItem> bestItems = executeQueries(indexRequest, request.getQueryLimits().getResultsTotal());
|
||||
RpcIndexQuery indexRequest = QueryProtobufCodec.convertQuery(request, query);
|
||||
List<RpcDecoratedResultItem> bestItems = executeQueries(indexRequest, request.getQueryLimits().getResultsTotal());
|
||||
|
||||
var responseBuilder = RpcQsResponse.newBuilder()
|
||||
.addAllResults(bestItems)
|
||||
.setSpecs(indexRequest)
|
||||
.addAllSearchTermsHuman(query.searchTermsHuman);
|
||||
var responseBuilder = RpcQsResponse.newBuilder()
|
||||
.addAllResults(bestItems)
|
||||
.setSpecs(indexRequest)
|
||||
.addAllSearchTermsHuman(query.searchTermsHuman);
|
||||
|
||||
if (query.domain != null)
|
||||
responseBuilder.setDomain(query.domain);
|
||||
if (query.domain != null)
|
||||
responseBuilder.setDomain(query.domain);
|
||||
|
||||
responseObserver.onNext(responseBuilder.build());
|
||||
responseObserver.onNext(responseBuilder.build());
|
||||
|
||||
responseObserver.onCompleted();
|
||||
responseObserver.onCompleted();
|
||||
});
|
||||
} catch (Exception e) {
|
||||
logger.error("Exception", e);
|
||||
responseObserver.onError(e);
|
||||
|
@ -89,8 +99,8 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
|
|||
private static final Comparator<RpcDecoratedResultItem> comparator =
|
||||
Comparator.comparing(RpcDecoratedResultItem::getRankingScore);
|
||||
|
||||
private List<RpcDecoratedResultItem> executeQueries(RpcIndexQuery indexRequest, int totalSize) throws InterruptedException
|
||||
{
|
||||
@SneakyThrows
|
||||
private List<RpcDecoratedResultItem> executeQueries(RpcIndexQuery indexRequest, int totalSize) {
|
||||
List<Callable<List<RpcDecoratedResultItem>>> tasks = createTasks(indexRequest);
|
||||
|
||||
return es.invokeAll(tasks).stream()
|
||||
|
|
|
@ -3,13 +3,13 @@ package nu.marginalia.query;
|
|||
import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import io.grpc.ServerBuilder;
|
||||
import io.prometheus.client.Histogram;
|
||||
import nu.marginalia.client.Context;
|
||||
import nu.marginalia.db.DomainBlacklist;
|
||||
import nu.marginalia.index.client.IndexClient;
|
||||
import nu.marginalia.index.client.model.query.SearchSpecification;
|
||||
import nu.marginalia.index.client.model.results.DecoratedSearchResultItem;
|
||||
import nu.marginalia.index.client.model.results.SearchResultSet;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import nu.marginalia.query.model.QueryParams;
|
||||
import nu.marginalia.query.model.QueryResponse;
|
||||
import nu.marginalia.query.svc.NodeConfigurationWatcher;
|
||||
|
@ -21,9 +21,7 @@ import spark.Response;
|
|||
import spark.Spark;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class QueryService extends Service {
|
||||
|
||||
|
@ -33,6 +31,13 @@ public class QueryService extends Service {
|
|||
private final DomainBlacklist blacklist;
|
||||
private final QueryFactory queryFactory;
|
||||
|
||||
private static final Histogram wmsa_qs_query_time_rest = Histogram.build()
|
||||
.name("wmsa_qs_query_time_rest")
|
||||
.linearBuckets(0.005, 0.005, 15)
|
||||
.help("QS-side query time (REST endpoint)")
|
||||
.register();
|
||||
|
||||
|
||||
@Inject
|
||||
public QueryService(BaseServiceParams params,
|
||||
IndexClient indexClient,
|
||||
|
@ -58,23 +63,25 @@ public class QueryService extends Service {
|
|||
}
|
||||
|
||||
private Object search(Request request, Response response) {
|
||||
String json = request.body();
|
||||
QueryParams params = gson.fromJson(json, QueryParams.class);
|
||||
return wmsa_qs_query_time_rest.time(() -> {
|
||||
String json = request.body();
|
||||
QueryParams params = gson.fromJson(json, QueryParams.class);
|
||||
|
||||
var query = queryFactory.createQuery(params);
|
||||
var rsp = executeQuery(Context.fromRequest(request), query.specs);
|
||||
var query = queryFactory.createQuery(params);
|
||||
var rsp = executeQuery(Context.fromRequest(request), query.specs);
|
||||
|
||||
rsp.results.removeIf(this::isBlacklisted);
|
||||
rsp.results.removeIf(this::isBlacklisted);
|
||||
|
||||
response.type("application/json");
|
||||
response.type("application/json");
|
||||
|
||||
return new QueryResponse(
|
||||
query.specs,
|
||||
rsp.results,
|
||||
query.searchTermsHuman,
|
||||
List.of(), // no problems
|
||||
query.domain
|
||||
);
|
||||
return new QueryResponse(
|
||||
query.specs,
|
||||
rsp.results,
|
||||
query.searchTermsHuman,
|
||||
List.of(), // no problems
|
||||
query.domain
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
private SearchResultSet delegateToIndex(Request request, Response response) {
|
||||
|
|
Loading…
Reference in a new issue