Merge pull request #71 from MarginaliaSearch/metrics
Add Prometheus Instrumentation
This commit is contained in:
commit
7920c67a48
@ -2,6 +2,8 @@ package nu.marginalia.api;
|
||||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import io.prometheus.client.Counter;
|
||||
import io.prometheus.client.Histogram;
|
||||
import nu.marginalia.api.model.ApiLicense;
|
||||
import nu.marginalia.api.model.ApiSearchResults;
|
||||
import nu.marginalia.api.svc.LicenseService;
|
||||
@ -34,6 +36,23 @@ public class ApiService extends Service {
|
||||
// Marker for filtering out sensitive content from the persistent logs
|
||||
private final Marker queryMarker = MarkerFactory.getMarker("QUERY");
|
||||
|
||||
private final Counter wmsa_api_timeout_count = Counter.build()
|
||||
.name("wmsa_api_timeout_count")
|
||||
.labelNames("key")
|
||||
.help("API timeout count")
|
||||
.register();
|
||||
private final Counter wmsa_api_cache_hit_count = Counter.build()
|
||||
.name("wmsa_api_cache_hit_count")
|
||||
.labelNames("key")
|
||||
.help("API cache hit count")
|
||||
.register();
|
||||
|
||||
private static final Histogram wmsa_api_query_time = Histogram.build()
|
||||
.name("wmsa_api_query_time")
|
||||
.linearBuckets(0.005, 0.005, 15)
|
||||
.help("API-side query time")
|
||||
.register();
|
||||
|
||||
@Inject
|
||||
public ApiService(BaseServiceParams params,
|
||||
QueryClient queryClient,
|
||||
@ -83,6 +102,7 @@ public class ApiService extends Service {
|
||||
|
||||
var cachedResponse = responseCache.getResults(license, args[0], request.queryString());
|
||||
if (cachedResponse.isPresent()) {
|
||||
wmsa_api_cache_hit_count.labels(license.key).inc();
|
||||
return cachedResponse.get();
|
||||
}
|
||||
|
||||
@ -98,6 +118,7 @@ public class ApiService extends Service {
|
||||
|
||||
private ApiSearchResults doSearch(ApiLicense license, String query, Request request) {
|
||||
if (!rateLimiterService.isAllowed(license)) {
|
||||
wmsa_api_timeout_count.labels(license.key).inc();
|
||||
Spark.halt(503, "Slow down");
|
||||
}
|
||||
|
||||
@ -106,9 +127,13 @@ public class ApiService extends Service {
|
||||
|
||||
logger.info(queryMarker, "{} Search {}", license.key, query);
|
||||
|
||||
return searchOperator
|
||||
.query(Context.fromRequest(request), query, count, index)
|
||||
.withLicense(license.getLicense());
|
||||
return wmsa_api_query_time
|
||||
.labels(license.key)
|
||||
.time(() ->
|
||||
searchOperator
|
||||
.query(Context.fromRequest(request), query, count, index)
|
||||
.withLicense(license.getLicense())
|
||||
);
|
||||
}
|
||||
|
||||
private int intParam(Request request, String name, int defaultValue) {
|
||||
|
@ -1,6 +1,8 @@
|
||||
package nu.marginalia.search;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import io.prometheus.client.Counter;
|
||||
import io.prometheus.client.Histogram;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.WebsiteUrl;
|
||||
import nu.marginalia.client.Context;
|
||||
@ -11,6 +13,7 @@ import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import spark.Request;
|
||||
import spark.Response;
|
||||
import spark.Route;
|
||||
import spark.Spark;
|
||||
|
||||
import java.net.URLEncoder;
|
||||
@ -22,6 +25,17 @@ public class SearchService extends Service {
|
||||
private final StaticResources staticResources;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(SearchService.class);
|
||||
private static final Histogram wmsa_search_service_request_time = Histogram.build()
|
||||
.name("wmsa_search_service_request_time")
|
||||
.linearBuckets(0.005, 0.005, 15)
|
||||
.labelNames("matchedPath", "method")
|
||||
.help("Search service request time (seconds)")
|
||||
.register();
|
||||
private static final Counter wmsa_search_service_error_count = Counter.build()
|
||||
.name("wmsa_search_service_error_count")
|
||||
.labelNames("matchedPath", "method")
|
||||
.help("Search service error count")
|
||||
.register();
|
||||
|
||||
@SneakyThrows
|
||||
@Inject
|
||||
@ -42,30 +56,56 @@ public class SearchService extends Service {
|
||||
|
||||
Spark.staticFiles.expireTime(600);
|
||||
|
||||
Spark.get("/search", searchQueryService::pathSearch);
|
||||
SearchServiceMetrics.get("/search", searchQueryService::pathSearch);
|
||||
SearchServiceMetrics.get("/public/search", searchQueryService::pathSearch);
|
||||
|
||||
Spark.get("/public/search", searchQueryService::pathSearch);
|
||||
Spark.get("/public/", frontPageService::render);
|
||||
Spark.get("/public/news.xml", frontPageService::renderNewsFeed);
|
||||
Spark.get("/public/:resource", this::serveStatic);
|
||||
SearchServiceMetrics.get("/public/", frontPageService::render);
|
||||
SearchServiceMetrics.get("/public/news.xml", frontPageService::renderNewsFeed);
|
||||
SearchServiceMetrics.get("/public/:resource", this::serveStatic);
|
||||
|
||||
Spark.post("/public/site/suggest/", addToCrawlQueueService::suggestCrawling);
|
||||
SearchServiceMetrics.post("/public/site/suggest/", addToCrawlQueueService::suggestCrawling);
|
||||
|
||||
Spark.get("/public/site-search/:site/*", this::siteSearchRedir);
|
||||
SearchServiceMetrics.get("/public/site-search/:site/*", this::siteSearchRedir);
|
||||
|
||||
Spark.get("/public/site/:site", siteInfoService::handle);
|
||||
Spark.post("/public/site/:site", siteInfoService::handlePost);
|
||||
SearchServiceMetrics.get("/public/site/:site", siteInfoService::handle);
|
||||
SearchServiceMetrics.post("/public/site/:site", siteInfoService::handlePost);
|
||||
|
||||
Spark.get("/public/crosstalk/", crosstalkService::handle);
|
||||
SearchServiceMetrics.get("/public/crosstalk/", crosstalkService::handle);
|
||||
|
||||
Spark.exception(Exception.class, (e,p,q) -> {
|
||||
logger.error("Error during processing", e);
|
||||
wmsa_search_service_error_count.labels(p.pathInfo(), p.requestMethod()).inc();
|
||||
errorPageService.serveError(Context.fromRequest(p), p, q);
|
||||
});
|
||||
|
||||
Spark.awaitInitialization();
|
||||
}
|
||||
|
||||
|
||||
|
||||
/** Wraps a route with a timer and a counter */
|
||||
private static class SearchServiceMetrics implements Route {
|
||||
private final Route delegatedRoute;
|
||||
|
||||
static void get(String path, Route route) {
|
||||
Spark.get(path, new SearchServiceMetrics(route));
|
||||
}
|
||||
static void post(String path, Route route) {
|
||||
Spark.post(path, new SearchServiceMetrics(route));
|
||||
}
|
||||
|
||||
private SearchServiceMetrics(Route delegatedRoute) {
|
||||
this.delegatedRoute = delegatedRoute;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object handle(Request request, Response response) throws Exception {
|
||||
return wmsa_search_service_request_time
|
||||
.labels(request.pathInfo(), request.requestMethod())
|
||||
.time(() -> delegatedRoute.handle(request, response));
|
||||
}
|
||||
}
|
||||
|
||||
private Object serveStatic(Request request, Response response) {
|
||||
String resource = request.params("resource");
|
||||
staticResources.serveStatic("search", resource, request, response);
|
||||
|
@ -1,6 +1,7 @@
|
||||
package nu.marginalia.search.svc;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import io.prometheus.client.Histogram;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.WebsiteUrl;
|
||||
import nu.marginalia.search.command.SearchAdtechParameter;
|
||||
|
@ -55,19 +55,19 @@ public class IndexQueryService extends IndexApiImplBase {
|
||||
private final Marker queryMarker = MarkerFactory.getMarker("QUERY");
|
||||
|
||||
private static final Counter wmsa_query_timeouts = Counter.build()
|
||||
.name("wmsa_query_timeouts")
|
||||
.name("wmsa_index_query_timeouts")
|
||||
.help("Query timeout counter")
|
||||
.labelNames("node")
|
||||
.labelNames("node", "api")
|
||||
.register();
|
||||
private static final Gauge wmsa_query_cost = Gauge.build()
|
||||
.name("wmsa_query_cost")
|
||||
.name("wmsa_index_query_cost")
|
||||
.help("Computational cost of query")
|
||||
.labelNames("node")
|
||||
.labelNames("node", "api")
|
||||
.register();
|
||||
private static final Histogram wmsa_query_time = Histogram.build()
|
||||
.name("wmsa_query_time")
|
||||
.linearBuckets(50., 50., 15)
|
||||
.labelNames("node")
|
||||
.name("wmsa_index_query_time")
|
||||
.linearBuckets(0.005, 0.005, 15)
|
||||
.labelNames("node", "api")
|
||||
.help("Index-side query time")
|
||||
.register();
|
||||
|
||||
@ -138,7 +138,7 @@ public class IndexQueryService extends IndexApiImplBase {
|
||||
|
||||
try {
|
||||
return wmsa_query_time
|
||||
.labels(nodeName)
|
||||
.labels(nodeName, "REST")
|
||||
.time(() -> {
|
||||
var params = new SearchParameters(specsSet, getSearchSet(specsSet));
|
||||
|
||||
@ -147,11 +147,12 @@ public class IndexQueryService extends IndexApiImplBase {
|
||||
logger.info(queryMarker, "Index Result Count: {}", results.size());
|
||||
|
||||
wmsa_query_cost
|
||||
.labels(nodeName)
|
||||
.labels(nodeName, "REST")
|
||||
.set(params.getDataCost());
|
||||
|
||||
if (!params.hasTimeLeft()) {
|
||||
wmsa_query_timeouts
|
||||
.labels(nodeName)
|
||||
.labels(nodeName, "REST")
|
||||
.inc();
|
||||
}
|
||||
|
||||
@ -179,7 +180,22 @@ public class IndexQueryService extends IndexApiImplBase {
|
||||
try {
|
||||
var params = new SearchParameters(request, getSearchSet(request));
|
||||
|
||||
SearchResultSet results = executeSearch(params);
|
||||
final String nodeName = Integer.toString(nodeId);
|
||||
|
||||
SearchResultSet results = wmsa_query_time
|
||||
.labels(nodeName, "GRPC")
|
||||
.time(() -> executeSearch(params));
|
||||
|
||||
wmsa_query_cost
|
||||
.labels(nodeName, "GRPC")
|
||||
.set(params.getDataCost());
|
||||
|
||||
if (!params.hasTimeLeft()) {
|
||||
wmsa_query_timeouts
|
||||
.labels(nodeName, "GRPC")
|
||||
.inc();
|
||||
}
|
||||
|
||||
for (var result : results.results) {
|
||||
|
||||
var rawResult = result.rawIndexResult;
|
||||
|
@ -3,6 +3,8 @@ package nu.marginalia.query;
|
||||
import com.google.inject.Inject;
|
||||
import io.grpc.ManagedChannel;
|
||||
import io.grpc.ManagedChannelBuilder;
|
||||
import io.prometheus.client.Histogram;
|
||||
import lombok.SneakyThrows;
|
||||
import nu.marginalia.db.DomainBlacklist;
|
||||
import nu.marginalia.index.api.*;
|
||||
import nu.marginalia.model.id.UrlIdCodec;
|
||||
@ -19,6 +21,12 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
|
||||
|
||||
private final Logger logger = LoggerFactory.getLogger(QueryGRPCService.class);
|
||||
|
||||
private static final Histogram wmsa_qs_query_time_grpc = Histogram.build()
|
||||
.name("wmsa_qs_query_time_grpc")
|
||||
.linearBuckets(0.005, 0.005, 15)
|
||||
.help("QS-side query time (GRPC endpoint)")
|
||||
.register();
|
||||
|
||||
private final Map<ServiceAndNode, ManagedChannel> channels
|
||||
= new ConcurrentHashMap<>();
|
||||
private final Map<ServiceAndNode, IndexApiGrpc.IndexApiBlockingStub> actorRpcApis
|
||||
@ -61,23 +69,25 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
|
||||
io.grpc.stub.StreamObserver<nu.marginalia.index.api.RpcQsResponse> responseObserver)
|
||||
{
|
||||
try {
|
||||
var params = QueryProtobufCodec.convertRequest(request);
|
||||
var query = queryFactory.createQuery(params);
|
||||
wmsa_qs_query_time_grpc.time(() -> {
|
||||
var params = QueryProtobufCodec.convertRequest(request);
|
||||
var query = queryFactory.createQuery(params);
|
||||
|
||||
RpcIndexQuery indexRequest = QueryProtobufCodec.convertQuery(request, query);
|
||||
List<RpcDecoratedResultItem> bestItems = executeQueries(indexRequest, request.getQueryLimits().getResultsTotal());
|
||||
RpcIndexQuery indexRequest = QueryProtobufCodec.convertQuery(request, query);
|
||||
List<RpcDecoratedResultItem> bestItems = executeQueries(indexRequest, request.getQueryLimits().getResultsTotal());
|
||||
|
||||
var responseBuilder = RpcQsResponse.newBuilder()
|
||||
.addAllResults(bestItems)
|
||||
.setSpecs(indexRequest)
|
||||
.addAllSearchTermsHuman(query.searchTermsHuman);
|
||||
var responseBuilder = RpcQsResponse.newBuilder()
|
||||
.addAllResults(bestItems)
|
||||
.setSpecs(indexRequest)
|
||||
.addAllSearchTermsHuman(query.searchTermsHuman);
|
||||
|
||||
if (query.domain != null)
|
||||
responseBuilder.setDomain(query.domain);
|
||||
if (query.domain != null)
|
||||
responseBuilder.setDomain(query.domain);
|
||||
|
||||
responseObserver.onNext(responseBuilder.build());
|
||||
responseObserver.onNext(responseBuilder.build());
|
||||
|
||||
responseObserver.onCompleted();
|
||||
responseObserver.onCompleted();
|
||||
});
|
||||
} catch (Exception e) {
|
||||
logger.error("Exception", e);
|
||||
responseObserver.onError(e);
|
||||
@ -89,8 +99,8 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
|
||||
private static final Comparator<RpcDecoratedResultItem> comparator =
|
||||
Comparator.comparing(RpcDecoratedResultItem::getRankingScore);
|
||||
|
||||
private List<RpcDecoratedResultItem> executeQueries(RpcIndexQuery indexRequest, int totalSize) throws InterruptedException
|
||||
{
|
||||
@SneakyThrows
|
||||
private List<RpcDecoratedResultItem> executeQueries(RpcIndexQuery indexRequest, int totalSize) {
|
||||
List<Callable<List<RpcDecoratedResultItem>>> tasks = createTasks(indexRequest);
|
||||
|
||||
return es.invokeAll(tasks).stream()
|
||||
|
@ -3,13 +3,13 @@ package nu.marginalia.query;
|
||||
import com.google.gson.Gson;
|
||||
import com.google.inject.Inject;
|
||||
import io.grpc.ServerBuilder;
|
||||
import io.prometheus.client.Histogram;
|
||||
import nu.marginalia.client.Context;
|
||||
import nu.marginalia.db.DomainBlacklist;
|
||||
import nu.marginalia.index.client.IndexClient;
|
||||
import nu.marginalia.index.client.model.query.SearchSpecification;
|
||||
import nu.marginalia.index.client.model.results.DecoratedSearchResultItem;
|
||||
import nu.marginalia.index.client.model.results.SearchResultSet;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import nu.marginalia.query.model.QueryParams;
|
||||
import nu.marginalia.query.model.QueryResponse;
|
||||
import nu.marginalia.query.svc.NodeConfigurationWatcher;
|
||||
@ -21,9 +21,7 @@ import spark.Response;
|
||||
import spark.Spark;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class QueryService extends Service {
|
||||
|
||||
@ -33,6 +31,13 @@ public class QueryService extends Service {
|
||||
private final DomainBlacklist blacklist;
|
||||
private final QueryFactory queryFactory;
|
||||
|
||||
private static final Histogram wmsa_qs_query_time_rest = Histogram.build()
|
||||
.name("wmsa_qs_query_time_rest")
|
||||
.linearBuckets(0.005, 0.005, 15)
|
||||
.help("QS-side query time (REST endpoint)")
|
||||
.register();
|
||||
|
||||
|
||||
@Inject
|
||||
public QueryService(BaseServiceParams params,
|
||||
IndexClient indexClient,
|
||||
@ -58,23 +63,25 @@ public class QueryService extends Service {
|
||||
}
|
||||
|
||||
private Object search(Request request, Response response) {
|
||||
String json = request.body();
|
||||
QueryParams params = gson.fromJson(json, QueryParams.class);
|
||||
return wmsa_qs_query_time_rest.time(() -> {
|
||||
String json = request.body();
|
||||
QueryParams params = gson.fromJson(json, QueryParams.class);
|
||||
|
||||
var query = queryFactory.createQuery(params);
|
||||
var rsp = executeQuery(Context.fromRequest(request), query.specs);
|
||||
var query = queryFactory.createQuery(params);
|
||||
var rsp = executeQuery(Context.fromRequest(request), query.specs);
|
||||
|
||||
rsp.results.removeIf(this::isBlacklisted);
|
||||
rsp.results.removeIf(this::isBlacklisted);
|
||||
|
||||
response.type("application/json");
|
||||
response.type("application/json");
|
||||
|
||||
return new QueryResponse(
|
||||
query.specs,
|
||||
rsp.results,
|
||||
query.searchTermsHuman,
|
||||
List.of(), // no problems
|
||||
query.domain
|
||||
);
|
||||
return new QueryResponse(
|
||||
query.specs,
|
||||
rsp.results,
|
||||
query.searchTermsHuman,
|
||||
List.of(), // no problems
|
||||
query.domain
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
private SearchResultSet delegateToIndex(Request request, Response response) {
|
||||
|
@ -8,10 +8,8 @@ x-svc: &service
|
||||
- logs:/var/log/wmsa
|
||||
networks:
|
||||
- wmsa
|
||||
depends_on:
|
||||
mariadb:
|
||||
condition: service_healthy
|
||||
|
||||
labels:
|
||||
- "__meta_docker_port_private=7000"
|
||||
x-p1: &partition-1
|
||||
env_file:
|
||||
- "run/env/service.env"
|
||||
@ -201,6 +199,18 @@ services:
|
||||
- "/var/run/docker.sock:/var/run/docker.sock:ro"
|
||||
networks:
|
||||
- wmsa
|
||||
prometheus:
|
||||
image: "prom/prometheus"
|
||||
container_name: "prometheus"
|
||||
command:
|
||||
- "--config.file=/etc/prometheus/prometheus.yml"
|
||||
ports:
|
||||
- "127.0.0.1:8091:9090"
|
||||
volumes:
|
||||
- "./run/prometheus.yml:/etc/prometheus/prometheus.yml"
|
||||
- "/var/run/docker.sock:/var/run/docker.sock:ro"
|
||||
networks:
|
||||
- wmsa
|
||||
networks:
|
||||
wmsa:
|
||||
volumes:
|
||||
|
13
run/prometheus.yml
Normal file
13
run/prometheus.yml
Normal file
@ -0,0 +1,13 @@
|
||||
global:
|
||||
scrape_interval: 15s
|
||||
evaluation_interval: 15s
|
||||
|
||||
scrape_configs:
|
||||
- job_name: docker
|
||||
docker_sd_configs:
|
||||
- host: unix:///var/run/docker.sock
|
||||
relabel_configs:
|
||||
- source_labels:
|
||||
- '__meta_docker_network_ip'
|
||||
target_label: '__address__'
|
||||
replacement: '$1:7000'
|
Loading…
Reference in New Issue
Block a user