(*) Replace EC_DOMAIN_LINK table with files and in-memory caching

The EC_DOMAIN_LINK MariaDB table stores links between domains.  This is problematic, as both updating and querying this table is very slow in relation to how small the data is (~10 GB).  This slowness is largely caused by the database enforcing ACID guarantees we don't particularly need.

This changeset replaces the EC_DOMAIN_LINK table with a file in each index node containing 32 bit integer pairs corresponding to links between two domains.  This file is loaded in memory in each node, and can be queried via the Query Service.

A migration step is needed before this file is created in each node.   Until that happens, the actual data is loaded from the EC_DOMAIN_LINK table, but accessed as though it was a file.

The changeset also migrates/renames the links.db file to documents.db to avoid naming confusion between the two.
This commit is contained in:
Viktor Lofgren 2024-01-08 15:53:13 +01:00
parent d304c10641
commit edc1acbb7e
56 changed files with 1261 additions and 1176 deletions

View File

@ -4,6 +4,28 @@ package actorapi;
option java_package="nu.marginalia.index.api"; option java_package="nu.marginalia.index.api";
option java_multiple_files=true; option java_multiple_files=true;
service IndexDomainLinksApi {
rpc getAllLinks(Empty) returns (stream RpcDomainIdPairs) {}
rpc getLinksFromDomain(RpcDomainId) returns (RpcDomainIdList) {}
rpc getLinksToDomain(RpcDomainId) returns (RpcDomainIdList) {}
rpc countLinksFromDomain(RpcDomainId) returns (RpcDomainIdCount) {}
rpc countLinksToDomain(RpcDomainId) returns (RpcDomainIdCount) {}
}
message RpcDomainId {
int32 domainId = 1;
}
message RpcDomainIdList {
repeated int32 domainId = 1 [packed=true];
}
message RpcDomainIdCount {
int32 idCount = 1;
}
message RpcDomainIdPairs {
repeated int32 sourceIds = 1 [packed=true];
repeated int32 destIds = 2 [packed=true];
}
service QueryApi { service QueryApi {
rpc query(RpcQsQuery) returns (RpcQsResponse) {} rpc query(RpcQsQuery) returns (RpcQsResponse) {}
} }

View File

@ -20,8 +20,10 @@ dependencies {
implementation libs.bundles.slf4j implementation libs.bundles.slf4j
implementation libs.roaringbitmap
implementation libs.prometheus implementation libs.prometheus
implementation libs.notnull implementation libs.notnull
implementation libs.trove
implementation libs.guice implementation libs.guice
implementation libs.rxjava implementation libs.rxjava
implementation libs.gson implementation libs.gson

View File

@ -2,24 +2,33 @@ package nu.marginalia.query.client;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import gnu.trove.list.array.TIntArrayList;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder; import io.grpc.ManagedChannelBuilder;
import io.prometheus.client.Summary; import io.prometheus.client.Summary;
import nu.marginalia.client.AbstractDynamicClient; import nu.marginalia.client.AbstractDynamicClient;
import nu.marginalia.client.Context; import nu.marginalia.client.Context;
import nu.marginalia.index.api.Empty;
import nu.marginalia.index.api.IndexDomainLinksApiGrpc;
import nu.marginalia.index.api.QueryApiGrpc; import nu.marginalia.index.api.QueryApiGrpc;
import nu.marginalia.index.api.RpcDomainId;
import nu.marginalia.index.client.model.query.SearchSpecification; import nu.marginalia.index.client.model.query.SearchSpecification;
import nu.marginalia.index.client.model.results.SearchResultSet; import nu.marginalia.index.client.model.results.SearchResultSet;
import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.query.QueryProtobufCodec; import nu.marginalia.query.QueryProtobufCodec;
import nu.marginalia.query.model.QueryParams; import nu.marginalia.query.model.QueryParams;
import nu.marginalia.query.model.QueryResponse; import nu.marginalia.query.model.QueryResponse;
import nu.marginalia.service.descriptor.ServiceDescriptor;
import nu.marginalia.service.descriptor.ServiceDescriptors; import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId; import nu.marginalia.service.id.ServiceId;
import org.roaringbitmap.PeekableCharIterator;
import org.roaringbitmap.longlong.PeekableLongIterator;
import org.roaringbitmap.longlong.Roaring64Bitmap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.CheckReturnValue; import javax.annotation.CheckReturnValue;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -36,13 +45,15 @@ public class QueryClient extends AbstractDynamicClient {
.register(); .register();
private final Map<ServiceAndNode, ManagedChannel> channels = new ConcurrentHashMap<>(); private final Map<ServiceAndNode, ManagedChannel> channels = new ConcurrentHashMap<>();
private final Map<ServiceAndNode, QueryApiGrpc.QueryApiBlockingStub > queryApis = new ConcurrentHashMap<>(); private final Map<ServiceAndNode, QueryApiGrpc.QueryApiBlockingStub > queryIndexApis = new ConcurrentHashMap<>();
private final Map<ServiceAndNode, IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub> domainLinkApis = new ConcurrentHashMap<>();
record ServiceAndNode(String service, int node) { record ServiceAndNode(String service, int node) {
public String getHostName() { public String getHostName() {
return service; return service;
} }
} }
private ManagedChannel getChannel(ServiceAndNode serviceAndNode) { private ManagedChannel getChannel(ServiceAndNode serviceAndNode) {
return channels.computeIfAbsent(serviceAndNode, return channels.computeIfAbsent(serviceAndNode,
san -> ManagedChannelBuilder san -> ManagedChannelBuilder
@ -52,13 +63,21 @@ public class QueryClient extends AbstractDynamicClient {
} }
public QueryApiGrpc.QueryApiBlockingStub queryApi(int node) { public QueryApiGrpc.QueryApiBlockingStub queryApi(int node) {
return queryApis.computeIfAbsent(new ServiceAndNode("query-service", node), n -> return queryIndexApis.computeIfAbsent(new ServiceAndNode("query-service", node), n ->
QueryApiGrpc.newBlockingStub( QueryApiGrpc.newBlockingStub(
getChannel(n) getChannel(n)
) )
); );
} }
public IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub domainApi(int node) {
return domainLinkApis.computeIfAbsent(new ServiceAndNode("query-service", node), n ->
IndexDomainLinksApiGrpc.newBlockingStub(
getChannel(n)
)
);
}
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
@Inject @Inject
@ -66,6 +85,9 @@ public class QueryClient extends AbstractDynamicClient {
super(descriptors.forId(ServiceId.Query), GsonFactory::get); super(descriptors.forId(ServiceId.Query), GsonFactory::get);
} }
public QueryClient() {
super(new ServiceDescriptor(ServiceId.Query, "query-service"), GsonFactory::get);
}
/** Delegate an Index API style query directly to the index service */ /** Delegate an Index API style query directly to the index service */
@CheckReturnValue @CheckReturnValue
@ -82,4 +104,101 @@ public class QueryClient extends AbstractDynamicClient {
); );
} }
public AllLinks getAllDomainLinks() {
AllLinks links = new AllLinks();
domainApi(0).getAllLinks(Empty.newBuilder().build()).forEachRemaining(pairs -> {
for (int i = 0; i < pairs.getDestIdsCount(); i++) {
links.add(pairs.getSourceIds(i), pairs.getDestIds(i));
}
});
return links;
}
public List<Integer> getLinksToDomain(int domainId) {
try {
return domainApi(0).getLinksToDomain(RpcDomainId
.newBuilder()
.setDomainId(domainId)
.build())
.getDomainIdList();
}
catch (Exception e) {
logger.error("API Exception", e);
return List.of();
}
}
public List<Integer> getLinksFromDomain(int domainId) {
try {
return domainApi(0).getLinksFromDomain(RpcDomainId
.newBuilder()
.setDomainId(domainId)
.build())
.getDomainIdList();
}
catch (Exception e) {
logger.error("API Exception", e);
return List.of();
}
}
public int countLinksToDomain(int domainId) {
try {
return domainApi(0).countLinksToDomain(RpcDomainId
.newBuilder()
.setDomainId(domainId)
.build())
.getIdCount();
}
catch (Exception e) {
logger.error("API Exception", e);
return 0;
}
}
public int countLinksFromDomain(int domainId) {
try {
return domainApi(0).countLinksFromDomain(RpcDomainId
.newBuilder()
.setDomainId(domainId)
.build())
.getIdCount();
}
catch (Exception e) {
logger.error("API Exception", e);
return 0;
}
}
public static class AllLinks {
private final Roaring64Bitmap sourceToDest = new Roaring64Bitmap();
public void add(int source, int dest) {
sourceToDest.add(Integer.toUnsignedLong(source) << 32 | Integer.toUnsignedLong(dest));
}
public Iterator iterator() {
return new Iterator();
}
public class Iterator {
private final PeekableLongIterator base = sourceToDest.getLongIterator();
long val = Long.MIN_VALUE;
public boolean advance() {
if (base.hasNext()) {
val = base.next();
return true;
}
return false;
}
public int source() {
return (int) (val >>> 32);
}
public int dest() {
return (int) (val & 0xFFFF_FFFFL);
}
}
}
} }

View File

@ -16,6 +16,7 @@ configurations {
dependencies { dependencies {
implementation project(':code:common:model') implementation project(':code:common:model')
implementation project(':code:common:service')
implementation libs.bundles.slf4j implementation libs.bundles.slf4j
@ -23,6 +24,7 @@ dependencies {
implementation libs.bundles.gson implementation libs.bundles.gson
implementation libs.notnull implementation libs.notnull
implementation libs.bundles.mariadb
implementation libs.sqlite implementation libs.sqlite
implementation libs.commons.lang3 implementation libs.commons.lang3

View File

@ -1,11 +1,30 @@
The link database contains information about links, ## Domain Link Database
The domain link database contains information about links
between domains. It is a static in-memory database loaded
from a binary file.
* [DomainLinkDb](src/main/java/nu/marginalia/linkdb/DomainLinkDb.java)
* * [FileDomainLinkDb](src/main/java/nu/marginalia/linkdb/FileDomainLinkDb.java)
* * [SqlDomainLinkDb](src/main/java/nu/marginalia/linkdb/SqlDomainLinkDb.java)
* [DomainLinkDbWriter](src/main/java/nu/marginalia/linkdb/DomainLinkDbWriter.java)
* [DomainLinkDbLoader](src/main/java/nu/marginalia/linkdb/DomainLinkDbLoader.java)
## Document Database
The document database contains information about links,
such as their ID, their URL, their title, their description, such as their ID, their URL, their title, their description,
and so forth. and so forth.
The link database is a sqlite file. The reason this information The document database is a sqlite file. The reason this information
is not in the MariaDB database is that this would make updates to is not in the MariaDB database is that this would make updates to
this information take effect in production immediately, even before this information take effect in production immediately, even before
the information was searchable. the information was searchable.
It is constructed by the [loading-process](../../processes/loading-process), and consumed * [DocumentLinkDbWriter](src/main/java/nu/marginalia/linkdb/DocumentDbWriter.java)
by the [index-service](../../services-core/index-service). * [DocumentLinkDbLoader](src/main/java/nu/marginalia/linkdb/DocumentDbReader.java)
## See Also
These databases are constructed by the [loading-process](../../processes/loading-process), and consumed by the [index-service](../../services-core/index-service).

View File

@ -4,7 +4,7 @@ import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import com.google.inject.name.Named; import com.google.inject.name.Named;
import gnu.trove.list.TLongList; import gnu.trove.list.TLongList;
import nu.marginalia.linkdb.model.LdbUrlDetail; import nu.marginalia.linkdb.model.DocdbUrlDetail;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.id.UrlIdCodec; import nu.marginalia.model.id.UrlIdCodec;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -23,21 +23,21 @@ import java.util.ArrayList;
import java.util.List; import java.util.List;
@Singleton @Singleton
public class LinkdbReader { public class DocumentDbReader {
private final Path dbFile; private final Path dbFile;
private volatile Connection connection; private volatile Connection connection;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
@Inject @Inject
public LinkdbReader(@Named("linkdb-file") Path dbFile) throws SQLException { public DocumentDbReader(@Named("docdb-file") Path dbFile) throws SQLException {
this.dbFile = dbFile; this.dbFile = dbFile;
if (Files.exists(dbFile)) { if (Files.exists(dbFile)) {
connection = createConnection(); connection = createConnection();
} }
else { else {
logger.warn("No linkdb file {}", dbFile); logger.warn("No docdb file {}", dbFile);
} }
} }
@ -107,8 +107,8 @@ public class LinkdbReader {
return ret; return ret;
} }
public List<LdbUrlDetail> getUrlDetails(TLongList ids) throws SQLException { public List<DocdbUrlDetail> getUrlDetails(TLongList ids) throws SQLException {
List<LdbUrlDetail> ret = new ArrayList<>(ids.size()); List<DocdbUrlDetail> ret = new ArrayList<>(ids.size());
if (connection == null || if (connection == null ||
connection.isClosed()) connection.isClosed())
@ -126,7 +126,7 @@ public class LinkdbReader {
var rs = stmt.executeQuery(); var rs = stmt.executeQuery();
if (rs.next()) { if (rs.next()) {
var url = new EdgeUrl(rs.getString("URL")); var url = new EdgeUrl(rs.getString("URL"));
ret.add(new LdbUrlDetail( ret.add(new DocdbUrlDetail(
rs.getLong("ID"), rs.getLong("ID"),
url, url,
rs.getString("TITLE"), rs.getString("TITLE"),

View File

@ -1,24 +1,23 @@
package nu.marginalia.linkdb; package nu.marginalia.linkdb;
import nu.marginalia.linkdb.model.LdbUrlDetail; import nu.marginalia.linkdb.model.DocdbUrlDetail;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Types;
import java.util.List; import java.util.List;
public class LinkdbWriter { public class DocumentDbWriter {
private final Connection connection; private final Connection connection;
public LinkdbWriter(Path outputFile) throws SQLException { public DocumentDbWriter(Path outputFile) throws SQLException {
String connStr = "jdbc:sqlite:" + outputFile.toString(); String connStr = "jdbc:sqlite:" + outputFile.toString();
connection = DriverManager.getConnection(connStr); connection = DriverManager.getConnection(connStr);
try (var stream = ClassLoader.getSystemResourceAsStream("db/linkdb-document.sql"); try (var stream = ClassLoader.getSystemResourceAsStream("db/docdb-document.sql");
var stmt = connection.createStatement() var stmt = connection.createStatement()
) { ) {
var sql = new String(stream.readAllBytes()); var sql = new String(stream.readAllBytes());
@ -31,11 +30,11 @@ public class LinkdbWriter {
} }
} }
public void add(LdbUrlDetail ldbUrlDetail) throws SQLException { public void add(DocdbUrlDetail docdbUrlDetail) throws SQLException {
add(List.of(ldbUrlDetail)); add(List.of(docdbUrlDetail));
} }
public void add(List<LdbUrlDetail> ldbUrlDetail) throws SQLException { public void add(List<DocdbUrlDetail> docdbUrlDetail) throws SQLException {
try (var stmt = connection.prepareStatement(""" try (var stmt = connection.prepareStatement("""
INSERT OR IGNORE INTO DOCUMENT(ID, URL, TITLE, DESCRIPTION, WORDS_TOTAL, FORMAT, FEATURES, DATA_HASH, QUALITY, PUB_YEAR) INSERT OR IGNORE INTO DOCUMENT(ID, URL, TITLE, DESCRIPTION, WORDS_TOTAL, FORMAT, FEATURES, DATA_HASH, QUALITY, PUB_YEAR)
@ -43,7 +42,7 @@ public class LinkdbWriter {
""")) { """)) {
int i = 0; int i = 0;
for (var document : ldbUrlDetail) { for (var document : docdbUrlDetail) {
var url = document.url(); var url = document.url();
stmt.setLong(1, document.urlId()); stmt.setLong(1, document.urlId());

View File

@ -0,0 +1,39 @@
package nu.marginalia.linkdb;
import gnu.trove.list.array.TIntArrayList;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Arrays;
/** A database of source-destination pairs of domain IDs. The database is loaded into memory from
* a source. The database is then kept in memory, reloading it upon switchInput().
*/
public interface DomainLinkDb {
/** Replace the current db file with the provided file. The provided file will be deleted.
* The in-memory database MAY be updated to reflect the change.
* */
void switchInput(Path filename) throws Exception;
/** Find all destinations for the given source. */
TIntArrayList findDestinations(int source);
/** Count the number of destinations for the given source. */
int countDestinations(int source);
/** Find all sources for the given destination. */
TIntArrayList findSources(int dest);
/** Count the number of sources for the given destination. */
int countSources(int source);
/** Iterate over all source-destination pairs. */
void forEach(SourceDestConsumer consumer);
interface SourceDestConsumer {
void accept(int source, int dest);
}
}

View File

@ -0,0 +1,45 @@
package nu.marginalia.linkdb;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class DomainLinkDbLoader implements AutoCloseable {
private final DataInputStream stream;
private final Path filename;
private long nextVal;
public DomainLinkDbLoader(Path filename) throws IOException {
this.stream = new DataInputStream(Files.newInputStream(filename));
this.filename = filename;
}
public int size() throws IOException {
return (int) (Files.size(filename) / 8);
}
public boolean next() {
try {
nextVal = stream.readLong();
return true;
}
catch (IOException ex) {
return false;
}
}
public int getSource() {
return (int) (nextVal >>> 32);
}
public int getDest() {
return (int) (nextVal & 0xffff_ffffL);
}
public void close() throws IOException {
stream.close();
}
}

View File

@ -0,0 +1,29 @@
package nu.marginalia.linkdb;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
public class DomainLinkDbWriter implements AutoCloseable {
private final DataOutputStream stream;
public DomainLinkDbWriter(Path fileName) throws IOException {
this.stream = new DataOutputStream(Files.newOutputStream(fileName,
StandardOpenOption.CREATE,
StandardOpenOption.WRITE,
StandardOpenOption.TRUNCATE_EXISTING)
);
}
public void write(int sourceDomainId, int destDomainId) throws IOException {
stream.writeLong(Integer.toUnsignedLong(sourceDomainId) << 32
| Integer.toUnsignedLong(destDomainId));
}
@Override
public void close() throws IOException {
stream.close();
}
}

View File

@ -0,0 +1,125 @@
package nu.marginalia.linkdb;
import com.google.inject.name.Named;
import gnu.trove.list.array.TIntArrayList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
/** Canonical DomainLinkDb implementation. The database is loaded into memory from
* a file. The database is then kept in memory, reloading it upon switchInput().
*/
public class FileDomainLinkDb implements DomainLinkDb {
private static final Logger logger = LoggerFactory.getLogger(FileDomainLinkDb.class);
private final Path filename;
private volatile long[] sourceToDest = new long[0];
private volatile long[] destToSource = new long[0];
public FileDomainLinkDb(@Named("domain-linkdb-file") Path filename) throws IOException {
this.filename = filename;
if (Files.exists(filename)) {
switchInput(filename);
}
}
@Override
public void switchInput(Path newFilename) throws IOException {
Files.move(newFilename, filename, StandardCopyOption.REPLACE_EXISTING);
loadInput(filename);
}
public void loadInput(Path filename) throws IOException {
try (var loader = new DomainLinkDbLoader(filename)) {
int size = loader.size();
var newSourceToDest = new long[size];
var newDestToSource = new long[size];
int i = 0;
while (loader.next()) {
long source = loader.getSource();
long dest = loader.getDest();
newSourceToDest[i] = (source << 32) | dest;
newDestToSource[i] = (dest << 32) | source;
i++;
}
Arrays.sort(newSourceToDest);
Arrays.sort(newDestToSource);
sourceToDest = newSourceToDest;
destToSource = newDestToSource;
}
}
@Override
public TIntArrayList findDestinations(int source) {
return findRelated(sourceToDest, source);
}
@Override
public TIntArrayList findSources(int dest) {
return findRelated(destToSource, dest);
}
@Override
public int countDestinations(int source) {
return countRelated(sourceToDest, source);
}
@Override
public int countSources(int dest) {
return countRelated(destToSource, dest);
}
@Override
public void forEach(SourceDestConsumer consumer) {
for (long val : sourceToDest) {
consumer.accept((int) (val >>> 32), (int) (val & 0xFFFF_FFFFL));
}
}
private TIntArrayList findRelated(long[] range, int key) {
long keyLong = Integer.toUnsignedLong(key) << 32;
long nextKeyLong = Integer.toUnsignedLong(key + 1) << 32;
int start = Arrays.binarySearch(range, keyLong);
if (start < 0) {
// Key is not found, get the insertion point
start = -start - 1;
}
TIntArrayList result = new TIntArrayList();
for (int i = start; i < range.length && range[i] < nextKeyLong; i++) {
result.add((int) (range[i] & 0xFFFF_FFFFL));
}
return result;
}
private int countRelated(long[] range, int key) {
long keyLong = Integer.toUnsignedLong(key) << 32;
long nextKeyLong = Integer.toUnsignedLong(key + 1) << 32;
int start = Arrays.binarySearch(range, keyLong);
if (start < 0) {
// Key is not found, get the insertion point
start = -start - 1;
}
int num = 0;
for (int i = start; i < range.length && range[i] < nextKeyLong; i++, num++);
return num;
}
}

View File

@ -0,0 +1,7 @@
package nu.marginalia.linkdb;
public class LinkdbFileNames {
public static String DEPRECATED_LINKDB_FILE_NAME = "links.db";
public static String DOCDB_FILE_NAME = "documents.db";
public static String DOMAIN_LINKS_FILE_NAME = "domain-links.dat";
}

View File

@ -0,0 +1,158 @@
package nu.marginalia.linkdb;
import com.google.inject.name.Named;
import com.zaxxer.hikari.HikariDataSource;
import gnu.trove.list.array.TIntArrayList;
import gnu.trove.list.array.TLongArrayList;
import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.Arrays;
/** DomainLinkDb implementation that goes through the motions of
* being a File-backed DomainLinkDb, but actually uses the legacy SQL database
* for loading the data.
* <p>
* This is part of the migration path to using FileDomainLinkDb.
*/
public class SqlDomainLinkDb implements DomainLinkDb {
private volatile long[] sourceToDest = new long[0];
private volatile long[] destToSource = new long[0];
private static final Logger logger = LoggerFactory.getLogger(SqlDomainLinkDb.class);
private final Path filename;
private final HikariDataSource dataSource;
private final int node;
public SqlDomainLinkDb(@Named("domain-linkdb-file") Path filename,
HikariDataSource dataSource,
ServiceConfiguration configuration)
{
this.filename = filename;
this.dataSource = dataSource;
node = configuration.node();
Thread.ofPlatform().start(() -> {
try {
loadDb();
} catch (Exception e) {
logger.error("Failed to load linkdb", e);
}
});
}
@Override
public void switchInput(Path newFilename) throws IOException {
Files.move(newFilename, filename, StandardCopyOption.REPLACE_EXISTING);
loadDb();
}
public void loadDb() {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement(
STR."""
SELECT
SOURCE_DOMAIN_ID,
DEST_DOMAIN_ID
FROM EC_DOMAIN_LINK
INNER JOIN EC_DOMAIN
ON EC_DOMAIN.ID = EC_DOMAIN_LINK.SOURCE_DOMAIN_ID
WHERE NODE_AFFINITY=\{node}
""");
var rs = stmt.executeQuery())
{
TLongArrayList sourceToDest = new TLongArrayList(10_000_000);
TLongArrayList destToSource = new TLongArrayList(10_000_000);
while (rs.next()) {
long source = Integer.toUnsignedLong(rs.getInt(1));
long dest = Integer.toUnsignedLong(rs.getInt(2));
sourceToDest.add((source << 32) | dest);
destToSource.add((dest << 32) | source);
}
sourceToDest.sort();
destToSource.sort();
this.sourceToDest = sourceToDest.toArray();
this.destToSource = destToSource.toArray();
}
catch (Exception ex) {
logger.error("Failed to load linkdb", ex);
}
logger.info("LinkDB loaded, size = {}", sourceToDest.length);
}
@Override
public TIntArrayList findDestinations(int source) {
return findRelated(sourceToDest, source);
}
@Override
public TIntArrayList findSources(int dest) {
return findRelated(destToSource, dest);
}
@Override
public int countDestinations(int source) {
return countRelated(sourceToDest, source);
}
@Override
public int countSources(int dest) {
return countRelated(destToSource, dest);
}
@Override
public void forEach(SourceDestConsumer consumer) {
for (long val : sourceToDest) {
consumer.accept((int) (val >>> 32), (int) (val & 0xFFFF_FFFFL));
}
}
private TIntArrayList findRelated(long[] range, int key) {
long keyLong = Integer.toUnsignedLong(key) << 32;
long nextKeyLong = Integer.toUnsignedLong(key + 1) << 32;
int start = Arrays.binarySearch(range, keyLong);
if (start < 0) {
// Key is not found, get the insertion point
start = -start - 1;
}
TIntArrayList result = new TIntArrayList();
for (int i = start; i < range.length && range[i] < nextKeyLong; i++) {
result.add((int) (range[i] & 0xFFFF_FFFFL));
}
return result;
}
private int countRelated(long[] range, int key) {
long keyLong = Integer.toUnsignedLong(key) << 32;
long nextKeyLong = Integer.toUnsignedLong(key + 1) << 32;
int start = Arrays.binarySearch(range, keyLong);
if (start < 0) {
// Key is not found, get the insertion point
start = -start - 1;
}
int num = 0;
for (int i = start; i < range.length && range[i] < nextKeyLong; i++, num++);
return num;
}
}

View File

@ -0,0 +1,18 @@
package nu.marginalia.linkdb.model;
import nu.marginalia.model.EdgeUrl;
public record DocdbUrlDetail(long urlId,
EdgeUrl url,
String title,
String description,
double urlQuality,
String format,
int features,
Integer pubYear,
long dataHash,
int wordsTotal
)
{
}

View File

@ -1,18 +0,0 @@
package nu.marginalia.linkdb.model;
import nu.marginalia.model.EdgeUrl;
public record LdbUrlDetail(long urlId,
EdgeUrl url,
String title,
String description,
double urlQuality,
String format,
int features,
Integer pubYear,
long dataHash,
int wordsTotal
)
{
}

View File

@ -1,7 +1,7 @@
package nu.marginalia.linkdb; package nu.marginalia.linkdb;
import gnu.trove.list.array.TLongArrayList; import gnu.trove.list.array.TLongArrayList;
import nu.marginalia.linkdb.model.LdbUrlDetail; import nu.marginalia.linkdb.model.DocdbUrlDetail;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -10,13 +10,13 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.sql.SQLException; import java.sql.SQLException;
public class LinkdbWriterTest { public class DocumentDbWriterTest {
@Test @Test
public void testCreate() throws IOException { public void testCreate() throws IOException {
Path tempPath = Files.createTempFile("linkdb", ".db"); Path tempPath = Files.createTempFile("docdb", ".db");
try { try {
var writer = new LinkdbWriter(tempPath); var writer = new DocumentDbWriter(tempPath);
writer.add(new LdbUrlDetail( writer.add(new DocdbUrlDetail(
1, 1,
new nu.marginalia.model.EdgeUrl("http", new EdgeDomain("example.com"), null, "/", null), new nu.marginalia.model.EdgeUrl("http", new EdgeDomain("example.com"), null, "/", null),
"Test", "Test",
@ -30,7 +30,7 @@ public class LinkdbWriterTest {
)); ));
writer.close(); writer.close();
var reader = new LinkdbReader(tempPath); var reader = new DocumentDbReader(tempPath);
var deets = reader.getUrlDetails(new TLongArrayList(new long[]{1})); var deets = reader.getUrlDetails(new TLongArrayList(new long[]{1}));
System.out.println(deets); System.out.println(deets);
} catch (SQLException e) { } catch (SQLException e) {

View File

@ -0,0 +1,50 @@
package nu.marginalia.linkdb;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class DomainLinkDbTest {
Path fileName;
@BeforeEach
public void setUp() throws IOException {
fileName = Files.createTempFile("test", ".db");
}
@AfterEach
public void tearDown() throws IOException {
Files.deleteIfExists(fileName);
}
@Test
public void testWriteRead() {
try (var writer = new DomainLinkDbWriter(fileName)) {
writer.write(1, 2);
writer.write(2, 3);
writer.write(3, 4);
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
try (var reader = new DomainLinkDbLoader(fileName)) {
Assertions.assertTrue(reader.next());
Assertions.assertEquals(1, reader.getSource());
Assertions.assertEquals(2, reader.getDest());
Assertions.assertTrue(reader.next());
Assertions.assertEquals(2, reader.getSource());
Assertions.assertEquals(3, reader.getDest());
Assertions.assertTrue(reader.next());
Assertions.assertEquals(3, reader.getSource());
Assertions.assertEquals(4, reader.getDest());
Assertions.assertFalse(reader.next());
}
catch (IOException ex) {
throw new RuntimeException(ex);
}
}
}

View File

@ -17,6 +17,8 @@ dependencies {
implementation project(':code:common:db') implementation project(':code:common:db')
implementation project(':code:common:model') implementation project(':code:common:model')
implementation project(':code:common:service') implementation project(':code:common:service')
implementation project(':code:common:service-client')
implementation project(':code:api:query-api')
implementation libs.bundles.slf4j implementation libs.bundles.slf4j
implementation libs.bundles.mariadb implementation libs.bundles.mariadb

View File

@ -5,6 +5,7 @@ import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.db.DomainBlacklistImpl; import nu.marginalia.db.DomainBlacklistImpl;
import nu.marginalia.model.crawl.DomainIndexingState; import nu.marginalia.model.crawl.DomainIndexingState;
import nu.marginalia.query.client.QueryClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -15,14 +16,18 @@ import java.util.function.IntConsumer;
@Singleton @Singleton
public class RankingDomainFetcher { public class RankingDomainFetcher {
protected final HikariDataSource dataSource; protected final HikariDataSource dataSource;
private final QueryClient queryClient;
protected final DomainBlacklistImpl blacklist; protected final DomainBlacklistImpl blacklist;
protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final Logger logger = LoggerFactory.getLogger(getClass());
protected boolean getNames = false; protected boolean getNames = false;
@Inject @Inject
public RankingDomainFetcher(HikariDataSource dataSource, DomainBlacklistImpl blacklist) { public RankingDomainFetcher(HikariDataSource dataSource,
QueryClient queryClient,
DomainBlacklistImpl blacklist) {
this.dataSource = dataSource; this.dataSource = dataSource;
this.queryClient = queryClient;
this.blacklist = blacklist; this.blacklist = blacklist;
} }
@ -33,10 +38,10 @@ public class RankingDomainFetcher {
public void getDomains(Consumer<RankingDomainData> consumer) { public void getDomains(Consumer<RankingDomainData> consumer) {
String query; String query;
if (getNames) { if (getNames) {
query = "SELECT EC_DOMAIN.ID,DOMAIN_NAME,DOMAIN_ALIAS,STATE,KNOWN_URLS FROM EC_DOMAIN INNER JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID INNER JOIN EC_DOMAIN_LINK ON SOURCE_DOMAIN_ID=EC_DOMAIN.ID WHERE ((INDEXED>1 AND IS_ALIVE) OR (INDEXED=1 AND VISITED_URLS=KNOWN_URLS AND GOOD_URLS>0)) AND SOURCE_DOMAIN_ID!=DEST_DOMAIN_ID GROUP BY EC_DOMAIN.ID"; query = "SELECT EC_DOMAIN.ID,DOMAIN_NAME,DOMAIN_ALIAS,STATE,KNOWN_URLS FROM EC_DOMAIN INNER JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID WHERE NODE_AFFINITY>0 GROUP BY EC_DOMAIN.ID";
} }
else { else {
query = "SELECT EC_DOMAIN.ID,\"\",DOMAIN_ALIAS,STATE,KNOWN_URLS FROM EC_DOMAIN INNER JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID INNER JOIN EC_DOMAIN_LINK ON SOURCE_DOMAIN_ID=EC_DOMAIN.ID WHERE ((INDEXED>1 AND IS_ALIVE) OR (INDEXED=1 AND VISITED_URLS=KNOWN_URLS AND GOOD_URLS>0)) AND SOURCE_DOMAIN_ID!=DEST_DOMAIN_ID GROUP BY EC_DOMAIN.ID"; query = "SELECT EC_DOMAIN.ID,\"\",DOMAIN_ALIAS,STATE,KNOWN_URLS FROM EC_DOMAIN INNER JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID WHERE NODE_AFFINITY>0 GROUP BY EC_DOMAIN.ID";
} }
getDomains(query, consumer); getDomains(query, consumer);
@ -77,23 +82,14 @@ public class RankingDomainFetcher {
} }
public void eachDomainLink(DomainLinkConsumer consumer) { public void eachDomainLink(DomainLinkConsumer consumer) {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("SELECT SOURCE_DOMAIN_ID, DEST_DOMAIN_ID FROM EC_DOMAIN_LINK"))
{
stmt.setFetchSize(10000);
var rsp = stmt.executeQuery(); var allLinks = queryClient.getAllDomainLinks();
var iter = allLinks.iterator();
while (rsp.next()) { while (iter.advance()) {
int src = rsp.getInt(1); consumer.accept(iter.source(), iter.dest());
int dst = rsp.getInt(2);
consumer.accept(src, dst);
}
}
catch (SQLException ex) {
logger.error("Failed to fetch domain links", ex);
} }
} }
public void domainsByPattern(String pattern, IntConsumer idConsumer) { public void domainsByPattern(String pattern, IntConsumer idConsumer) {

View File

@ -4,6 +4,7 @@ import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.db.DomainBlacklistImpl; import nu.marginalia.db.DomainBlacklistImpl;
import nu.marginalia.query.client.QueryClient;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.SQLException; import java.sql.SQLException;
@ -14,8 +15,8 @@ public class RankingDomainFetcherForSimilarityData extends RankingDomainFetcher
final boolean hasData; final boolean hasData;
@Inject @Inject
public RankingDomainFetcherForSimilarityData(HikariDataSource dataSource, DomainBlacklistImpl blacklist) { public RankingDomainFetcherForSimilarityData(HikariDataSource dataSource, QueryClient queryClient, DomainBlacklistImpl blacklist) {
super(dataSource, blacklist); super(dataSource, queryClient, blacklist);
hasData = isDomainNeighborTablePopulated(dataSource); hasData = isDomainNeighborTablePopulated(dataSource);
} }
@ -61,17 +62,6 @@ public class RankingDomainFetcherForSimilarityData extends RankingDomainFetcher
} }
public void getDomains(Consumer<RankingDomainData> consumer) { public void getDomains(Consumer<RankingDomainData> consumer) {
// String query =
// """
// SELECT EC_DOMAIN.ID,DOMAIN_NAME,DOMAIN_ALIAS,STATE,COALESCE(KNOWN_URLS, 0)
// FROM EC_DOMAIN
// LEFT JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
// INNER JOIN EC_DOMAIN_LINK ON DEST_DOMAIN_ID=EC_DOMAIN.ID
// WHERE SOURCE_DOMAIN_ID!=DEST_DOMAIN_ID
// GROUP BY EC_DOMAIN.ID
// HAVING COUNT(SOURCE_DOMAIN_ID)>5
// """;
String query; String query;
if (getNames) { if (getNames) {
query = query =

View File

@ -1,71 +0,0 @@
package nu.marginalia.ranking.tool;
import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows;
import nu.marginalia.db.DomainBlacklistImpl;
import nu.marginalia.ranking.StandardPageRank;
import nu.marginalia.ranking.accumulator.RankingResultListAccumulator;
import nu.marginalia.ranking.data.RankingDomainFetcherForSimilarityData;
import nu.marginalia.service.module.DatabaseModule;
import org.mariadb.jdbc.Driver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.concurrent.LinkedBlockingQueue;
public class CreateBrowseDomainRanksTool {
private static final Logger logger = LoggerFactory.getLogger(CreateBrowseDomainRanksTool.class);
static final LinkedBlockingQueue<Integer> uploadQueue = new LinkedBlockingQueue<>(10);
volatile static boolean running = true;
@SneakyThrows
public static void main(String... args) {
Driver driver = new Driver();
var conn = new DatabaseModule().provideConnection();
long start = System.currentTimeMillis();
var uploader = new Thread(() -> uploadThread(conn), "Uploader");
logger.info("Ranking");
var ds = new DatabaseModule().provideConnection();
var domains = new RankingDomainFetcherForSimilarityData(ds, new DomainBlacklistImpl(ds));
var rpr = new StandardPageRank(domains, args);
uploader.start();
var rankData = rpr.pageRankWithPeripheralNodes(1000, RankingResultListAccumulator::new);
rankData.forEach(i -> {
try {
uploadQueue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
});
long end = System.currentTimeMillis();
running = false;
uploader.join();
logger.info("Done in {}", (end - start)/1000.0);
}
public static void uploadThread(HikariDataSource dataSource) {
try (var conn = dataSource.getConnection()) {
try (var stmt = conn.prepareStatement("INSERT IGNORE INTO EC_RANDOM_DOMAINS(DOMAIN_SET, DOMAIN_ID) VALUES (3, ?)")) {
while (running || (!running && !uploadQueue.isEmpty())) {
var job = uploadQueue.take();
stmt.setInt(1, job);
stmt.executeUpdate();
}
}
} catch (SQLException | InterruptedException throwables) {
throwables.printStackTrace();
}
}
}

View File

@ -1,264 +0,0 @@
package nu.marginalia.ranking.tool;
import com.zaxxer.hikari.HikariDataSource;
import gnu.trove.list.TIntList;
import gnu.trove.list.array.TIntArrayList;
import gnu.trove.map.hash.TIntIntHashMap;
import gnu.trove.map.hash.TIntObjectHashMap;
import it.unimi.dsi.fastutil.ints.IntArrays;
import it.unimi.dsi.fastutil.ints.IntComparator;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import nu.marginalia.ranking.RankingAlgorithm;
import nu.marginalia.ranking.data.RankingDomainData;
import nu.marginalia.ranking.data.RankingDomainFetcher;
import nu.marginalia.db.DomainBlacklistImpl;
import nu.marginalia.service.module.DatabaseModule;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.IntStream;
public class PerusePageRankV2 {
final TIntObjectHashMap<RankingDomainData> domainsById = new TIntObjectHashMap<>();
final TIntIntHashMap domainIndexToId = new TIntIntHashMap();
final TIntIntHashMap domainIdToIndex = new TIntIntHashMap();
TIntArrayList[] linkDataSrc2Dest;
TIntArrayList[] linkDataDest2Src;
private final Logger logger = LoggerFactory.getLogger(getClass());
static final LinkedBlockingQueue<LinkAdjacencies> uploadQueue = new LinkedBlockingQueue<>(10);
volatile static boolean running = true;
public int indexMax() {
return domainIndexToId.size();
}
public int getDomainId(int idx) {
return domainIndexToId.get(idx);
}
@SneakyThrows
public static void main(String... args) {
var ds = new DatabaseModule().provideConnection();
var blacklist = new DomainBlacklistImpl(ds);
var rank = new PerusePageRankV2(new RankingDomainFetcher(ds, blacklist));
long start = System.currentTimeMillis();
var uploader = new Thread(() -> uploadThread(ds));
uploader.start();
IntStream.range(0, rank.indexMax()).parallel().forEach(i -> {
int[] ids = rank.pageRank(i, 25).toArray();
try {
uploadQueue.put(new LinkAdjacencies(rank.getDomainId(i), ids));
} catch (InterruptedException e) {
e.printStackTrace();
}
});
long end = System.currentTimeMillis();
running = false;
uploader.join();
System.out.printf("%2.2f", (end - start)/1000.0);
}
@AllArgsConstructor
static class LinkAdjacencies {
public final int id;
public final int[] neighbors;
}
public static void uploadThread(HikariDataSource dataSource) {
try (var conn = dataSource.getConnection()) {
try (var stmt = conn.prepareStatement("INSERT INTO EC_DOMAIN_NEIGHBORS(DOMAIN_ID, NEIGHBOR_ID, ADJ_IDX) VALUES (?,?,?) ON DUPLICATE KEY UPDATE NEIGHBOR_ID=VALUES(NEIGHBOR_ID)")) {
while (running || (!running && !uploadQueue.isEmpty())) {
var job = uploadQueue.take();
for (int i = 0; i < job.neighbors.length; i++) {
stmt.setInt(1, job.id);
stmt.setInt(2, job.neighbors[i]);
stmt.setInt(3, i);
stmt.addBatch();
}
stmt.executeBatch();
}
}
} catch (SQLException | InterruptedException throwables) {
throwables.printStackTrace();
}
}
public PerusePageRankV2(RankingDomainFetcher domainFetcher) {
domainFetcher.getDomains(domainData -> {
int id = domainData.id;
domainsById.put(id, domainData);
domainIndexToId.put(domainIndexToId.size(), id);
domainIdToIndex.put(id, domainIdToIndex.size());
});
domainFetcher.getPeripheralDomains(domainData -> {
int id = domainData.id;
domainsById.put(id, domainData);
domainIndexToId.put(domainIndexToId.size(), id);
domainIdToIndex.put(id, domainIdToIndex.size());
});
linkDataSrc2Dest = new TIntArrayList[domainIndexToId.size()];
linkDataDest2Src = new TIntArrayList[domainIndexToId.size()];
domainFetcher.eachDomainLink((src, dst) -> {
if (src == dst) return;
if (domainsById.contains(src) && domainsById.contains(dst)) {
int srcIdx = domainIdToIndex.get(src);
int dstIdx = domainIdToIndex.get(domainsById.get(dst).resolveAlias());
if (linkDataSrc2Dest[srcIdx] == null) {
linkDataSrc2Dest[srcIdx] = new TIntArrayList();
}
linkDataSrc2Dest[srcIdx].add(dstIdx);
if (linkDataDest2Src[dstIdx] == null) {
linkDataDest2Src[dstIdx] = new TIntArrayList();
}
linkDataDest2Src[dstIdx].add(srcIdx);
}
});
}
public TIntList pageRank(int origin, int resultCount) {
RankVector rank = new RankVector(1.d / domainsById.size());
int iter_max = 10;
for (int i = 0; i < iter_max; i++) {
RankVector newRank = createNewRankVector(rank);
double oldNorm = rank.norm();
double newNorm = newRank.norm();
double dNorm = oldNorm - newNorm ;
newRank.increment(origin, dNorm/oldNorm);
rank = newRank;
}
rank.increment(origin, -1);
return rank.getRanking(resultCount);
}
@NotNull
private RankVector createNewRankVector(RankVector rank) {
double rankNorm = rank.norm();
RankVector newRank = new RankVector(0);
for (int domainId = 0; domainId < domainIndexToId.size(); domainId++) {
var links = linkDataSrc2Dest[domainId];
double newRankValue = 0;
if (links != null && links.size() > 0) {
for (int j = 0; j < links.size(); j++) {
var revLinks = linkDataDest2Src[links.getQuick(j)];
newRankValue += rank.get(links.getQuick(j)) / revLinks.size();
}
}
newRank.set(domainId, 0.85*newRankValue/rankNorm);
}
return newRank;
}
public class RankVector {
private final double[] rank;
public RankVector(double defaultValue) {
rank = new double[domainIndexToId.size()];
if (defaultValue != 0.) {
Arrays.fill(rank, defaultValue);
}
}
public void set(int id, double value) {
rank[id] = value;
}
public void increment(int id, double value) {
rank[id] += value;
}
public double get(int id) {
if (id >= rank.length) return 0.;
return rank[id];
}
public double norm() {
double v = 0.;
for (int i = 0; i < rank.length; i++) {
if (rank[i] > 0) { v+=rank[i]; }
else { v -= rank[i]; }
}
return v;
}
public double norm(RankingAlgorithm.RankVector other) {
double v = 0.;
for (int i = 0; i < rank.length; i++) {
double dv = rank[i] - other.get(i);
if (dv > 0) { v+=dv; }
else { v -= dv; }
}
return v;
}
public TIntList getRanking(int numResults) {
if (numResults < 0) {
numResults = domainIdToIndex.size();
}
TIntArrayList list = new TIntArrayList(numResults);
int[] nodes = new int[rank.length];
Arrays.setAll(nodes, i->i);
IntComparator comp = (i,j) -> (int) Math.signum(rank[j] - rank[i]);
IntArrays.quickSort(nodes, comp);
int i;
for (i = 0; i < numResults; i++) {
int id = domainIndexToId.get(nodes[i]);
if (!domainsById.get(id).isAlias())
list.add(id);
}
for (; i < nodes.length && domainsById.size() < numResults; i++) {
int id = domainIndexToId.get(nodes[i]);
if (!domainsById.get(id).isAlias())
list.add(id);
}
return list;
}
}
}

View File

@ -1,67 +0,0 @@
package nu.marginalia.ranking.tool;
import lombok.SneakyThrows;
import nu.marginalia.ranking.accumulator.RankingResultListAccumulator;
import nu.marginalia.ranking.data.RankingDomainFetcher;
import nu.marginalia.db.DomainBlacklistImpl;
import nu.marginalia.ranking.StandardPageRank;
import nu.marginalia.ranking.data.RankingDomainFetcherForSimilarityData;
import nu.marginalia.service.module.DatabaseModule;
import org.mariadb.jdbc.Driver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
public class PrintDomainRanksTool {
private static final Logger logger = LoggerFactory.getLogger(PrintDomainRanksTool.class);
private volatile static int rankMax;
static final LinkedBlockingQueue<Integer> uploadQueue = new LinkedBlockingQueue<>(10);
volatile static boolean running = true;
@SneakyThrows
public static void main(String... args) {
Driver driver = new Driver();
var conn = new DatabaseModule().provideConnection();
long start = System.currentTimeMillis();
logger.info("Ranking");
var ds = new DatabaseModule().provideConnection();
RankingDomainFetcher domains;
if (Boolean.getBoolean("use-link-data")) {
domains = new RankingDomainFetcher(ds, new DomainBlacklistImpl(ds));
domains.retainNames();
}
else {
domains = new RankingDomainFetcherForSimilarityData(ds, new DomainBlacklistImpl(ds));
domains.retainNames();
}
var rpr = new StandardPageRank(domains, args);
rankMax = rpr.size();
var rankData = rpr.pageRankWithPeripheralNodes(rankMax, RankingResultListAccumulator::new);
AtomicInteger cnt = new AtomicInteger();
rankData.forEach(i -> {
var data = rpr.getDomainData(i);
System.out.printf("%d %s %s\n", cnt.getAndIncrement(), data.name, data.state);
return true;
});
long end = System.currentTimeMillis();
running = false;
logger.info("Done in {}", (end - start)/1000.0);
}
}

View File

@ -1,85 +0,0 @@
package nu.marginalia.ranking.tool;
import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows;
import nu.marginalia.ranking.StandardPageRank;
import nu.marginalia.ranking.accumulator.RankingResultListAccumulator;
import nu.marginalia.ranking.data.RankingDomainFetcherForSimilarityData;
import nu.marginalia.db.DomainBlacklistImpl;
import nu.marginalia.service.module.DatabaseModule;
import org.mariadb.jdbc.Driver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.concurrent.LinkedBlockingQueue;
public class UpdateDomainRanksTool {
private static final Logger logger = LoggerFactory.getLogger(UpdateDomainRanksTool.class);
private volatile static int rankMax;
static final LinkedBlockingQueue<Integer> uploadQueue = new LinkedBlockingQueue<>(10);
volatile static boolean running = true;
@SneakyThrows
public static void main(String... args) {
Driver driver = new Driver();
var conn = new DatabaseModule().provideConnection();
long start = System.currentTimeMillis();
var uploader = new Thread(() -> uploadThread(conn), "Uploader");
logger.info("Ranking");
var domains = new RankingDomainFetcherForSimilarityData(conn, new DomainBlacklistImpl(conn));
var rpr = new StandardPageRank(domains, "memex.marginalia.nu", "bikobatanari.art", "sadgrl.online", "wiki.xxiivv.com");
rankMax = rpr.size();
uploader.start();
var rankData = rpr.pageRankWithPeripheralNodes(rankMax, RankingResultListAccumulator::new);
rankData.forEach(i -> {
try {
uploadQueue.put(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
return true;
});
long end = System.currentTimeMillis();
running = false;
uploader.join();
logger.info("Done in {}", (end - start)/1000.0);
}
public static void uploadThread(HikariDataSource dataSource) {
int i = 0;
try (var conn = dataSource.getConnection()) {
logger.info("Resetting rank");
try (var stmt = conn.prepareStatement("UPDATE EC_DOMAIN SET RANK=1")) {
stmt.executeUpdate();
}
logger.info("Updating ranks");
try (var stmt = conn.prepareStatement("UPDATE EC_DOMAIN SET RANK=? WHERE ID=?")) {
while (running || (!running && !uploadQueue.isEmpty())) {
var job = uploadQueue.take();
stmt.setDouble(1, i++ / (double) rankMax);
stmt.setInt(2, job);
stmt.executeUpdate();
}
}
logger.info("Recalculating quality");
} catch (SQLException | InterruptedException throwables) {
throwables.printStackTrace();
}
}
}

View File

@ -3,7 +3,6 @@ package nu.marginalia.browse;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import gnu.trove.set.hash.TIntHashSet;
import nu.marginalia.browse.model.BrowseResult; import nu.marginalia.browse.model.BrowseResult;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;

View File

@ -1,132 +0,0 @@
package nu.marginalia.browse;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.browse.model.BrowseResult;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.db.DomainBlacklist;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.*;
@Singleton
public class DbBrowseDomainsSimilarOldAlgo {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final HikariDataSource dataSource;
@Inject
public DbBrowseDomainsSimilarOldAlgo(HikariDataSource dataSource) {
this.dataSource = dataSource;
}
public List<BrowseResult> getDomainNeighborsAdjacent(int domainId, DomainBlacklist blacklist, int count) {
final Set<BrowseResult> domains = new HashSet<>(count*3);
final String q = """
SELECT EC_DOMAIN.ID AS NEIGHBOR_ID, DOMAIN_NAME, COUNT(*) AS CNT, INDEXED
FROM EC_DOMAIN_NEIGHBORS
INNER JOIN EC_DOMAIN ON NEIGHBOR_ID=EC_DOMAIN.ID
INNER JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
INNER JOIN EC_DOMAIN_LINK ON DEST_DOMAIN_ID=EC_DOMAIN.ID
WHERE
STATE<2
AND KNOWN_URLS<1000
AND DOMAIN_ALIAS IS NULL
AND EC_DOMAIN_NEIGHBORS.DOMAIN_ID = ?
GROUP BY EC_DOMAIN.ID
HAVING CNT < 100
ORDER BY ADJ_IDX
LIMIT ?
""";
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(q)) {
stmt.setFetchSize(count);
stmt.setInt(1, domainId);
stmt.setInt(2, count);
var rsp = stmt.executeQuery();
while (rsp.next()) {
int id = rsp.getInt(1);
String domain = rsp.getString(2);
if (!blacklist.isBlacklisted(id)) {
domains.add(new BrowseResult(new EdgeDomain(domain).toRootUrl(), id, 0, rsp.getBoolean("INDEXED")));
}
}
}
if (domains.size() < count/2) {
final String q2 = """
SELECT EC_DOMAIN.ID, DOMAIN_NAME, INDEXED
FROM EC_DOMAIN
INNER JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
INNER JOIN EC_DOMAIN_LINK B ON DEST_DOMAIN_ID=EC_DOMAIN.ID
INNER JOIN EC_DOMAIN_LINK O ON O.DEST_DOMAIN_ID=EC_DOMAIN.ID
WHERE B.SOURCE_DOMAIN_ID=?
AND STATE<2
AND KNOWN_URLS<1000
AND DOMAIN_ALIAS IS NULL
GROUP BY EC_DOMAIN.ID
HAVING COUNT(*) < 100 ORDER BY RANK ASC LIMIT ?""";
try (var stmt = connection.prepareStatement(q2)) {
stmt.setFetchSize(count/2);
stmt.setInt(1, domainId);
stmt.setInt(2, count/2 - domains.size());
var rsp = stmt.executeQuery();
while (rsp.next() && domains.size() < count/2) {
int id = rsp.getInt(1);
String domain = rsp.getString(2);
if (!blacklist.isBlacklisted(id)) {
domains.add(new BrowseResult(new EdgeDomain(domain).toRootUrl(), id, 0, rsp.getBoolean("INDEXED")));
}
}
}
}
if (domains.size() < count/2) {
final String q3 = """
SELECT EC_DOMAIN.ID, DOMAIN_NAME, INDEXED
FROM EC_DOMAIN
INNER JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
INNER JOIN EC_DOMAIN_LINK B ON B.SOURCE_DOMAIN_ID=EC_DOMAIN.ID
INNER JOIN EC_DOMAIN_LINK O ON O.DEST_DOMAIN_ID=EC_DOMAIN.ID
WHERE B.DEST_DOMAIN_ID=?
AND STATE<2
AND KNOWN_URLS<1000
AND DOMAIN_ALIAS IS NULL
GROUP BY EC_DOMAIN.ID
HAVING COUNT(*) < 100
ORDER BY RANK ASC
LIMIT ?""";
try (var stmt = connection.prepareStatement(q3)) {
stmt.setFetchSize(count/2);
stmt.setInt(1, domainId);
stmt.setInt(2, count/2 - domains.size());
var rsp = stmt.executeQuery();
while (rsp.next() && domains.size() < count/2) {
int id = rsp.getInt(1);
String domain = rsp.getString(2);
if (!blacklist.isBlacklisted(id)) {
domains.add(new BrowseResult(new EdgeDomain(domain).toRootUrl(), id, 0, rsp.getBoolean("INDEXED")));
}
}
}
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
return new ArrayList<>(domains);
}
}

View File

@ -9,7 +9,7 @@ import lombok.SneakyThrows;
import nu.marginalia.ProcessConfiguration; import nu.marginalia.ProcessConfiguration;
import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.ProcessConfigurationModule;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.linkdb.LinkdbWriter; import nu.marginalia.linkdb.DocumentDbWriter;
import nu.marginalia.loading.documents.DocumentLoaderService; import nu.marginalia.loading.documents.DocumentLoaderService;
import nu.marginalia.loading.documents.KeywordLoaderService; import nu.marginalia.loading.documents.KeywordLoaderService;
import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.loading.domains.DomainIdRegistry;
@ -43,7 +43,7 @@ public class LoaderMain {
private final ProcessHeartbeatImpl heartbeat; private final ProcessHeartbeatImpl heartbeat;
private final MessageQueueFactory messageQueueFactory; private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService; private final FileStorageService fileStorageService;
private final LinkdbWriter linkdbWriter; private final DocumentDbWriter documentDbWriter;
private final LoaderIndexJournalWriter journalWriter; private final LoaderIndexJournalWriter journalWriter;
private final DomainLoaderService domainService; private final DomainLoaderService domainService;
private final DomainLinksLoaderService linksService; private final DomainLinksLoaderService linksService;
@ -77,7 +77,7 @@ public class LoaderMain {
public LoaderMain(ProcessHeartbeatImpl heartbeat, public LoaderMain(ProcessHeartbeatImpl heartbeat,
MessageQueueFactory messageQueueFactory, MessageQueueFactory messageQueueFactory,
FileStorageService fileStorageService, FileStorageService fileStorageService,
LinkdbWriter linkdbWriter, DocumentDbWriter documentDbWriter,
LoaderIndexJournalWriter journalWriter, LoaderIndexJournalWriter journalWriter,
DomainLoaderService domainService, DomainLoaderService domainService,
DomainLinksLoaderService linksService, DomainLinksLoaderService linksService,
@ -90,7 +90,7 @@ public class LoaderMain {
this.heartbeat = heartbeat; this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory; this.messageQueueFactory = messageQueueFactory;
this.fileStorageService = fileStorageService; this.fileStorageService = fileStorageService;
this.linkdbWriter = linkdbWriter; this.documentDbWriter = documentDbWriter;
this.journalWriter = journalWriter; this.journalWriter = journalWriter;
this.domainService = domainService; this.domainService = domainService;
this.linksService = linksService; this.linksService = linksService;
@ -132,7 +132,7 @@ public class LoaderMain {
} }
finally { finally {
journalWriter.close(); journalWriter.close();
linkdbWriter.close(); documentDbWriter.close();
heartbeat.shutDown(); heartbeat.shutDown();
} }

View File

@ -9,8 +9,9 @@ import com.google.inject.name.Names;
import nu.marginalia.LanguageModels; import nu.marginalia.LanguageModels;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import nu.marginalia.IndexLocations; import nu.marginalia.IndexLocations;
import nu.marginalia.linkdb.DomainLinkDbWriter;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.linkdb.LinkdbWriter; import nu.marginalia.linkdb.DocumentDbWriter;
import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.service.SearchServiceDescriptors; import nu.marginalia.service.SearchServiceDescriptors;
import nu.marginalia.service.descriptor.ServiceDescriptors; import nu.marginalia.service.descriptor.ServiceDescriptors;
@ -20,6 +21,9 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.sql.SQLException; import java.sql.SQLException;
import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME;
import static nu.marginalia.linkdb.LinkdbFileNames.DOMAIN_LINKS_FILE_NAME;
public class LoaderModule extends AbstractModule { public class LoaderModule extends AbstractModule {
public LoaderModule() { public LoaderModule() {
@ -34,14 +38,26 @@ public class LoaderModule extends AbstractModule {
} }
@Inject @Provides @Singleton @Inject @Provides @Singleton
private LinkdbWriter createLinkdbWriter(FileStorageService service) throws SQLException, IOException { private DocumentDbWriter createLinkdbWriter(FileStorageService service) throws SQLException, IOException {
// Migrate
Path dbPath = IndexLocations.getLinkdbWritePath(service).resolve("links.db"); Path dbPath = IndexLocations.getLinkdbWritePath(service).resolve(DOCDB_FILE_NAME);
if (Files.exists(dbPath)) { if (Files.exists(dbPath)) {
Files.delete(dbPath); Files.delete(dbPath);
} }
return new LinkdbWriter(dbPath); return new DocumentDbWriter(dbPath);
}
@Inject @Provides @Singleton
private DomainLinkDbWriter createDomainLinkdbWriter(FileStorageService service) throws SQLException, IOException {
Path dbPath = IndexLocations.getLinkdbWritePath(service).resolve(DOMAIN_LINKS_FILE_NAME);
if (Files.exists(dbPath)) {
Files.delete(dbPath);
}
return new DomainLinkDbWriter(dbPath);
} }
private Gson createGson() { private Gson createGson() {

View File

@ -4,9 +4,8 @@ import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import nu.marginalia.io.processed.DocumentRecordParquetFileReader; import nu.marginalia.io.processed.DocumentRecordParquetFileReader;
import nu.marginalia.io.processed.ProcessedDataFileNames; import nu.marginalia.linkdb.DocumentDbWriter;
import nu.marginalia.linkdb.LinkdbWriter; import nu.marginalia.linkdb.model.DocdbUrlDetail;
import nu.marginalia.linkdb.model.LdbUrlDetail;
import nu.marginalia.loading.LoaderInputData; import nu.marginalia.loading.LoaderInputData;
import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
@ -26,11 +25,11 @@ import java.util.List;
public class DocumentLoaderService { public class DocumentLoaderService {
private static final Logger logger = LoggerFactory.getLogger(DocumentLoaderService.class); private static final Logger logger = LoggerFactory.getLogger(DocumentLoaderService.class);
private final LinkdbWriter linkdbWriter; private final DocumentDbWriter documentDbWriter;
@Inject @Inject
public DocumentLoaderService(LinkdbWriter linkdbWriter) { public DocumentLoaderService(DocumentDbWriter documentDbWriter) {
this.linkdbWriter = linkdbWriter; this.documentDbWriter = documentDbWriter;
} }
public boolean loadDocuments( public boolean loadDocuments(
@ -73,7 +72,7 @@ public class DocumentLoaderService {
class LinkdbLoader implements AutoCloseable { class LinkdbLoader implements AutoCloseable {
private final DomainIdRegistry domainIdRegistry; private final DomainIdRegistry domainIdRegistry;
private final List<LdbUrlDetail> details = new ArrayList<>(1000); private final List<DocdbUrlDetail> details = new ArrayList<>(1000);
LinkdbLoader(DomainIdRegistry domainIdRegistry) { LinkdbLoader(DomainIdRegistry domainIdRegistry) {
this.domainIdRegistry = domainIdRegistry; this.domainIdRegistry = domainIdRegistry;
@ -88,7 +87,7 @@ public class DocumentLoaderService {
projection.ordinal projection.ordinal
); );
details.add(new LdbUrlDetail( details.add(new DocdbUrlDetail(
urlId, urlId,
new EdgeUrl(projection.url), new EdgeUrl(projection.url),
projection.title, projection.title,
@ -102,7 +101,7 @@ public class DocumentLoaderService {
)); ));
if (details.size() > 100) { if (details.size() > 100) {
linkdbWriter.add(details); documentDbWriter.add(details);
details.clear(); details.clear();
} }
@ -111,7 +110,7 @@ public class DocumentLoaderService {
@Override @Override
public void close() throws SQLException { public void close() throws SQLException {
if (!details.isEmpty()) { if (!details.isEmpty()) {
linkdbWriter.add(details); documentDbWriter.add(details);
} }
} }
} }

View File

@ -2,10 +2,9 @@ package nu.marginalia.loading.links;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource; import lombok.SneakyThrows;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.io.processed.DomainLinkRecordParquetFileReader; import nu.marginalia.io.processed.DomainLinkRecordParquetFileReader;
import nu.marginalia.io.processed.ProcessedDataFileNames; import nu.marginalia.linkdb.DomainLinkDbWriter;
import nu.marginalia.loading.LoaderInputData; import nu.marginalia.loading.LoaderInputData;
import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.model.processed.DomainLinkRecord; import nu.marginalia.model.processed.DomainLinkRecord;
@ -15,28 +14,22 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Path; import java.nio.file.Path;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@Singleton @Singleton
public class DomainLinksLoaderService { public class DomainLinksLoaderService {
private final HikariDataSource dataSource;
private static final Logger logger = LoggerFactory.getLogger(DomainLinksLoaderService.class); private static final Logger logger = LoggerFactory.getLogger(DomainLinksLoaderService.class);
private final int nodeId;
private final DomainLinkDbWriter domainLinkDbWriter;
@Inject @Inject
public DomainLinksLoaderService(HikariDataSource dataSource, public DomainLinksLoaderService(DomainLinkDbWriter domainLinkDbWriter) {
ProcessConfiguration processConfiguration) { this.domainLinkDbWriter = domainLinkDbWriter;
this.dataSource = dataSource;
this.nodeId = processConfiguration.node();
} }
public boolean loadLinks(DomainIdRegistry domainIdRegistry, public boolean loadLinks(DomainIdRegistry domainIdRegistry,
ProcessHeartbeat heartbeat, ProcessHeartbeat heartbeat,
LoaderInputData inputData) throws IOException, SQLException { LoaderInputData inputData) throws IOException {
dropLinkData();
try (var task = heartbeat.createAdHocTaskHeartbeat("LINKS")) { try (var task = heartbeat.createAdHocTaskHeartbeat("LINKS")) {
var linkFiles = inputData.listDomainLinkFiles(); var linkFiles = inputData.listDomainLinkFiles();
@ -56,17 +49,7 @@ public class DomainLinksLoaderService {
return true; return true;
} }
private void dropLinkData() throws SQLException { private void loadLinksFromFile(DomainIdRegistry domainIdRegistry, Path file) throws IOException {
logger.info("Clearing EC_DOMAIN_LINK");
try (var conn = dataSource.getConnection();
var call = conn.prepareCall("CALL PURGE_LINKS_TABLE(?)")) {
call.setInt(1, nodeId);
call.executeUpdate();
}
}
private void loadLinksFromFile(DomainIdRegistry domainIdRegistry, Path file) throws IOException, SQLException {
try (var domainStream = DomainLinkRecordParquetFileReader.stream(file); try (var domainStream = DomainLinkRecordParquetFileReader.stream(file);
var linkLoader = new LinkLoader(domainIdRegistry)) var linkLoader = new LinkLoader(domainIdRegistry))
{ {
@ -76,49 +59,21 @@ public class DomainLinksLoaderService {
} }
class LinkLoader implements AutoCloseable { class LinkLoader implements AutoCloseable {
private final Connection connection;
private final PreparedStatement insertStatement;
private final DomainIdRegistry domainIdRegistry; private final DomainIdRegistry domainIdRegistry;
private int batchSize = 0; public LinkLoader(DomainIdRegistry domainIdRegistry) {
private int total = 0;
public LinkLoader(DomainIdRegistry domainIdRegistry) throws SQLException {
this.domainIdRegistry = domainIdRegistry; this.domainIdRegistry = domainIdRegistry;
connection = dataSource.getConnection();
insertStatement = connection.prepareStatement("""
INSERT IGNORE INTO EC_DOMAIN_LINK(SOURCE_DOMAIN_ID, DEST_DOMAIN_ID)
VALUES (?, ?)
""");
} }
@SneakyThrows
void accept(DomainLinkRecord record) { void accept(DomainLinkRecord record) {
try { domainLinkDbWriter.write(
insertStatement.setInt(1, domainIdRegistry.getDomainId(record.source)); domainIdRegistry.getDomainId(record.source),
insertStatement.setInt(2, domainIdRegistry.getDomainId(record.dest)); domainIdRegistry.getDomainId(record.dest)
insertStatement.addBatch(); );
if (++batchSize > 1000) {
batchSize = 0;
insertStatement.executeBatch();
}
total++;
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
} }
@Override @Override
public void close() throws SQLException { public void close() {}
if (batchSize > 0) {
insertStatement.executeBatch();
}
logger.info("Inserted {} links", total);
insertStatement.close();
connection.close();
}
} }
} }

View File

@ -1,176 +0,0 @@
package nu.marginalia.loading.links;
import com.google.common.collect.Lists;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.io.processed.DomainLinkRecordParquetFileWriter;
import nu.marginalia.io.processed.DomainRecordParquetFileWriter;
import nu.marginalia.io.processed.ProcessedDataFileNames;
import nu.marginalia.loader.DbTestUtil;
import nu.marginalia.loading.LoaderInputData;
import nu.marginalia.loading.domains.DomainLoaderService;
import nu.marginalia.model.processed.DomainLinkRecord;
import nu.marginalia.model.processed.DomainRecord;
import nu.marginalia.process.control.ProcessAdHocTaskHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeat;
import org.junit.jupiter.api.*;
import org.mockito.Mockito;
import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.*;
import static org.junit.jupiter.api.Assertions.assertEquals;
@Tag("slow")
@Testcontainers
@Disabled // Error in the SQL loading mechanism, we don't deal with DELIMITER correctly
// which means we can't get around flyway's bugs necessitating DELIMITER.
class DomainLinksLoaderServiceTest {
List<Path> toDelete = new ArrayList<>();
ProcessHeartbeat heartbeat;
@Container
static MariaDBContainer<?> mariaDBContainer = new MariaDBContainer<>("mariadb")
.withDatabaseName("WMSA_prod")
.withUsername("wmsa")
.withPassword("wmsa")
.withInitScript("db/migration/V23_06_0_000__base.sql")
.withNetworkAliases("mariadb");
HikariDataSource dataSource;
@BeforeEach
public void setUp() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(mariaDBContainer.getJdbcUrl());
config.setUsername("wmsa");
config.setPassword("wmsa");
dataSource = new HikariDataSource(config);
List<String> migrations = List.of(
"db/migration/V23_11_0_007__domain_node_affinity.sql",
"db/migration/V23_11_0_008__purge_procedure.sql"
);
for (String migration : migrations) {
try (var resource = Objects.requireNonNull(ClassLoader.getSystemResourceAsStream(migration),
"Could not load migration script " + migration);
var conn = dataSource.getConnection();
var stmt = conn.createStatement()
) {
String script = new String(resource.readAllBytes());
String[] cmds = script.split("\\s*;\\s*");
for (String cmd : cmds) {
if (cmd.isBlank())
continue;
System.out.println(cmd);
stmt.executeUpdate(cmd);
}
} catch (IOException | SQLException ex) {
}
}
heartbeat = Mockito.mock(ProcessHeartbeat.class);
Mockito.when(heartbeat.createAdHocTaskHeartbeat(Mockito.anyString())).thenReturn(
Mockito.mock(ProcessAdHocTaskHeartbeat.class)
);
}
@AfterEach
public void tearDown() throws IOException {
for (var path : Lists.reverse(toDelete)) {
Files.deleteIfExists(path);
}
toDelete.clear();
dataSource.close();
}
@Test
public void test() throws IOException, SQLException {
Path workDir = Files.createTempDirectory(getClass().getSimpleName());
Path parquetFile1 = ProcessedDataFileNames.domainFileName(workDir, 0);
Path parquetFile2 = ProcessedDataFileNames.domainLinkFileName(workDir, 0);
Path parquetFile3 = ProcessedDataFileNames.domainLinkFileName(workDir, 1);
toDelete.add(workDir);
toDelete.add(parquetFile1);
toDelete.add(parquetFile2);
toDelete.add(parquetFile3);
List<String> domains1 = List.of("www.marginalia.nu", "search.marginalia.nu");
List<String> linkDomains1 = List.of("wiby.me", "www.mojeek.com", "www.altavista.com");
List<String> linkDomains2 = List.of("maya.land", "xkcd.com", "aaronsw.com");
try (var pw = new DomainRecordParquetFileWriter(parquetFile1)) {
for (var domain : domains1) {
pw.write(dr(domain));
}
}
try (var pw = new DomainLinkRecordParquetFileWriter(parquetFile2)) {
for (var domain : linkDomains1) {
pw.write(dl("www.marginalia.nu", domain));
}
}
try (var pw = new DomainLinkRecordParquetFileWriter(parquetFile3)) {
for (var domain : linkDomains2) {
pw.write(dl("search.marginalia.nu", domain));
}
}
try (var dataSource = DbTestUtil.getConnection(mariaDBContainer.getJdbcUrl());
var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT SOURCE_DOMAIN_ID, DEST_DOMAIN_ID FROM EC_DOMAIN_LINK
""")
) {
var domainService = new DomainLoaderService(dataSource, new ProcessConfiguration("test", 1, UUID.randomUUID()));
var input = new LoaderInputData(workDir, 2);
var domainRegistry = domainService.getOrCreateDomainIds(input);
var dls = new DomainLinksLoaderService(dataSource, new ProcessConfiguration("test", 1, UUID.randomUUID()));
dls.loadLinks(domainRegistry, heartbeat, input);
Map<Integer, Set<Integer>> expected = new HashMap<>();
Map<Integer, Set<Integer>> actual = new HashMap<>();
expected.put(domainRegistry.getDomainId("www.marginalia.nu"), new HashSet<>());
expected.put(domainRegistry.getDomainId("search.marginalia.nu"), new HashSet<>());
for (var domain : linkDomains1) {
expected.get(domainRegistry.getDomainId("www.marginalia.nu")).add(domainRegistry.getDomainId(domain));
}
for (var domain : linkDomains2) {
expected.get(domainRegistry.getDomainId("search.marginalia.nu")).add(domainRegistry.getDomainId(domain));
}
var rs = query.executeQuery();
while (rs.next()) {
actual.computeIfAbsent(rs.getInt(1), k -> new HashSet<>())
.add(rs.getInt(2));
}
assertEquals(expected, actual);
}
}
private DomainRecord dr(String domainName) {
return new DomainRecord(domainName, 0, 0, 0, null, null, null, null);
}
private DomainLinkRecord dl(String sourceDomainName, String destDomainName) {
return new DomainLinkRecord(sourceDomainName, destDomainName);
}
}

View File

@ -21,7 +21,9 @@ dependencies {
implementation project(':code:common:model') implementation project(':code:common:model')
implementation project(':code:common:db') implementation project(':code:common:db')
implementation project(':code:common:process') implementation project(':code:common:process')
implementation project(':code:common:service-client')
implementation project(':code:common:service') implementation project(':code:common:service')
implementation project(':code:api:query-api')
implementation libs.bundles.slf4j implementation libs.bundles.slf4j

View File

@ -1,26 +1,25 @@
package nu.marginalia.adjacencies; package nu.marginalia.adjacencies;
import com.zaxxer.hikari.HikariDataSource;
import gnu.trove.list.TIntList; import gnu.trove.list.TIntList;
import gnu.trove.list.array.TIntArrayList; import gnu.trove.list.array.TIntArrayList;
import gnu.trove.map.hash.TIntObjectHashMap; import gnu.trove.map.hash.TIntObjectHashMap;
import gnu.trove.set.hash.TIntHashSet; import gnu.trove.set.hash.TIntHashSet;
import nu.marginalia.query.client.QueryClient;
import org.roaringbitmap.RoaringBitmap; import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
public class AdjacenciesData { public class AdjacenciesData {
TIntList idsList = new TIntArrayList(100_000); private static final Logger logger = LoggerFactory.getLogger(AdjacenciesData.class);
ArrayList<SparseBitVector> itemsList = new ArrayList<>(100_000); private final TIntList idsList = new TIntArrayList(100_000);
private final ArrayList<SparseBitVector> itemsList = new ArrayList<>(100_000);
TIntObjectHashMap<SparseBitVector> dToSMap = new TIntObjectHashMap<>(100_000); private final TIntObjectHashMap<SparseBitVector> dToSMap = new TIntObjectHashMap<>(100_000);
TIntObjectHashMap<RoaringBitmap> sToDMap = new TIntObjectHashMap<>(100_000); private final TIntObjectHashMap<RoaringBitmap> sToDMap = new TIntObjectHashMap<>(100_000);
RoaringBitmap indexed = new RoaringBitmap();
public TIntHashSet getCandidates(SparseBitVector vec) { public TIntHashSet getCandidates(SparseBitVector vec) {
TIntHashSet ret = new TIntHashSet(); TIntHashSet ret = new TIntHashSet();
@ -36,39 +35,31 @@ public class AdjacenciesData {
return ret; return ret;
} }
public AdjacenciesData(HikariDataSource dataSource, DomainAliases aliases) throws SQLException { public AdjacenciesData(QueryClient queryClient,
DomainAliases aliases) {
logger.info("Loading adjacency data");
Map<Integer, RoaringBitmap> tmpMapDtoS = new HashMap<>(100_000); Map<Integer, RoaringBitmap> tmpMapDtoS = new HashMap<>(100_000);
try (
var conn = dataSource.getConnection();
var indexedStmt = conn.prepareStatement("SELECT ID FROM EC_DOMAIN WHERE INDEXED>0");
var linksStmt = conn.prepareStatement("SELECT SOURCE_DOMAIN_ID, DEST_DOMAIN_ID FROM EC_DOMAIN_LINK")) {
ResultSet rsp;
indexedStmt.setFetchSize(10_000); int count = 0;
rsp = indexedStmt.executeQuery(); var allLinks = queryClient.getAllDomainLinks();
while (rsp.next()) { for (var iter = allLinks.iterator();;count++) {
indexed.add(rsp.getInt(1)); if (!iter.advance()) {
break;
} }
int source = aliases.deAlias(iter.source());
int dest = aliases.deAlias(iter.dest());
linksStmt.setFetchSize(10_000); tmpMapDtoS.computeIfAbsent(dest, this::createBitmapWithSelf).add(source);
rsp = linksStmt.executeQuery(); RoaringBitmap sToDEntry = sToDMap.get(source);
while (rsp.next()) { if (sToDEntry == null) {
int source = aliases.deAlias(rsp.getInt(1)); sToDEntry = new RoaringBitmap();
int dest = aliases.deAlias(rsp.getInt(2)); sToDMap.put(source, sToDEntry);
sToDEntry.add(source);
tmpMapDtoS.computeIfAbsent(dest, this::createBitmapWithSelf).add(source);
RoaringBitmap sToDEntry = sToDMap.get(source);
if (sToDEntry == null) {
sToDEntry = new RoaringBitmap();
sToDMap.put(source, sToDEntry);
sToDEntry.add(source);
}
sToDEntry.add(dest);
} }
sToDEntry.add(dest);
} }
logger.info("Links loaded: {}", count);
tmpMapDtoS.entrySet().stream() tmpMapDtoS.entrySet().stream()
.filter(e -> isEligible(e.getValue())) .filter(e -> isEligible(e.getValue()))
@ -79,10 +70,10 @@ public class AdjacenciesData {
dToSMap.put(e.getKey(), val); dToSMap.put(e.getKey(), val);
}); });
logger.info("All adjacency dat loaded");
} }
private boolean isEligible(RoaringBitmap value) { private boolean isEligible(RoaringBitmap value) {
// return true;
int cardinality = value.getCardinality(); int cardinality = value.getCardinality();
return cardinality < 10000; return cardinality < 10000;
@ -95,10 +86,6 @@ public class AdjacenciesData {
return bm; return bm;
} }
public boolean isIndexedDomain(int domainId) {
return indexed.contains(domainId);
}
public TIntList getIdsList() { public TIntList getIdsList() {
return idsList; return idsList;
} }

View File

@ -7,7 +7,10 @@ import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeatImpl; import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.query.client.QueryClient;
import nu.marginalia.service.module.DatabaseModule; import nu.marginalia.service.module.DatabaseModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.*; import java.util.*;
@ -23,13 +26,14 @@ public class WebsiteAdjacenciesCalculator {
private final HikariDataSource dataSource; private final HikariDataSource dataSource;
public AdjacenciesData adjacenciesData; public AdjacenciesData adjacenciesData;
public DomainAliases domainAliases; public DomainAliases domainAliases;
private static final Logger logger = LoggerFactory.getLogger(WebsiteAdjacenciesCalculator.class);
float[] weights; float[] weights;
public WebsiteAdjacenciesCalculator(HikariDataSource dataSource) throws SQLException { public WebsiteAdjacenciesCalculator(QueryClient queryClient, HikariDataSource dataSource) throws SQLException {
this.dataSource = dataSource; this.dataSource = dataSource;
domainAliases = new DomainAliases(dataSource); domainAliases = new DomainAliases(dataSource);
adjacenciesData = new AdjacenciesData(dataSource, domainAliases); adjacenciesData = new AdjacenciesData(queryClient, domainAliases);
weights = adjacenciesData.getWeights(); weights = adjacenciesData.getWeights();
} }
@ -47,7 +51,6 @@ public class WebsiteAdjacenciesCalculator {
for (int domainId : domainIds) { for (int domainId : domainIds) {
findAdjacentDtoS(domainId, similarities -> { findAdjacentDtoS(domainId, similarities -> {
for (var similarity : similarities.similarities()) { for (var similarity : similarities.similarities()) {
if (adjacenciesData.isIndexedDomain(similarity.domainId)) System.out.print("*");
System.out.println(dataStoreDao.getDomain(similarity.domainId).map(Object::toString).orElse("") + " " + prettyPercent(similarity.value)); System.out.println(dataStoreDao.getDomain(similarity.domainId).map(Object::toString).orElse("") + " " + prettyPercent(similarity.value));
} }
}); });
@ -186,8 +189,9 @@ public class WebsiteAdjacenciesCalculator {
DatabaseModule dm = new DatabaseModule(); DatabaseModule dm = new DatabaseModule();
var dataSource = dm.provideConnection(); var dataSource = dm.provideConnection();
var qc = new QueryClient();
var main = new WebsiteAdjacenciesCalculator(dataSource); var main = new WebsiteAdjacenciesCalculator(qc, dataSource);
if (args.length == 1 && "load".equals(args[0])) { if (args.length == 1 && "load".equals(args[0])) {
var processHeartbeat = new ProcessHeartbeatImpl( var processHeartbeat = new ProcessHeartbeatImpl(
@ -195,9 +199,16 @@ public class WebsiteAdjacenciesCalculator {
dataSource dataSource
); );
processHeartbeat.start(); try {
main.loadAll(processHeartbeat); processHeartbeat.start();
processHeartbeat.shutDown(); main.loadAll(processHeartbeat);
}
catch (Exception ex) {
logger.error("Failed to load", ex);
}
finally {
processHeartbeat.shutDown();
}
return; return;
} }

View File

@ -66,7 +66,7 @@ public class BrowseCommand implements SearchCommandInterface {
return browseService.getRandomEntries(set); return browseService.getRandomEntries(set);
} }
else { else {
return browseService.getRelatedEntries(word); return browseService.getRelatedEntries(ctx, word);
} }
} }
catch (Exception ex) { catch (Exception ex) {

View File

@ -2,6 +2,7 @@ package nu.marginalia.search.results;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import nu.marginalia.assistant.client.model.SimilarDomain;
import nu.marginalia.browse.model.BrowseResult; import nu.marginalia.browse.model.BrowseResult;
import nu.marginalia.screenshot.ScreenshotService; import nu.marginalia.screenshot.ScreenshotService;
@ -18,7 +19,7 @@ public class BrowseResultCleaner {
this.screenshotService = screenshotService; this.screenshotService = screenshotService;
} }
public Predicate<BrowseResult> shouldRemoveResultPredicate() { public Predicate<BrowseResult> shouldRemoveResultPredicateBr() {
Set<String> domainHashes = new HashSet<>(100); Set<String> domainHashes = new HashSet<>(100);
return (res) -> !screenshotService.hasScreenshot(res.domainId()) return (res) -> !screenshotService.hasScreenshot(res.domainId())

View File

@ -1,16 +1,18 @@
package nu.marginalia.search.svc; package nu.marginalia.search.svc;
import com.google.inject.Inject; import com.google.inject.Inject;
import nu.marginalia.assistant.client.AssistantClient;
import nu.marginalia.assistant.client.model.SimilarDomain;
import nu.marginalia.browse.DbBrowseDomainsRandom; import nu.marginalia.browse.DbBrowseDomainsRandom;
import nu.marginalia.browse.DbBrowseDomainsSimilarCosine;
import nu.marginalia.browse.DbBrowseDomainsSimilarOldAlgo;
import nu.marginalia.browse.model.BrowseResult; import nu.marginalia.browse.model.BrowseResult;
import nu.marginalia.browse.model.BrowseResultSet; import nu.marginalia.browse.model.BrowseResultSet;
import nu.marginalia.client.Context;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.search.results.BrowseResultCleaner; import nu.marginalia.search.results.BrowseResultCleaner;
import java.util.ArrayList;
import java.util.HashSet; import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -19,55 +21,60 @@ import static java.util.Collections.shuffle;
public class SearchBrowseService { public class SearchBrowseService {
private final DbBrowseDomainsRandom randomDomains; private final DbBrowseDomainsRandom randomDomains;
private final DbBrowseDomainsSimilarCosine similarDomains;
private final DbBrowseDomainsSimilarOldAlgo similarDomainsOld;
private final DbDomainQueries domainQueries; private final DbDomainQueries domainQueries;
private final DomainBlacklist blacklist; private final DomainBlacklist blacklist;
private final AssistantClient assistantClient;
private final BrowseResultCleaner browseResultCleaner; private final BrowseResultCleaner browseResultCleaner;
@Inject @Inject
public SearchBrowseService(DbBrowseDomainsRandom randomDomains, public SearchBrowseService(DbBrowseDomainsRandom randomDomains,
DbBrowseDomainsSimilarCosine similarDomains,
DbBrowseDomainsSimilarOldAlgo similarDomainsOld,
DbDomainQueries domainQueries, DbDomainQueries domainQueries,
DomainBlacklist blacklist, DomainBlacklist blacklist,
AssistantClient assistantClient,
BrowseResultCleaner browseResultCleaner) BrowseResultCleaner browseResultCleaner)
{ {
this.randomDomains = randomDomains; this.randomDomains = randomDomains;
this.similarDomains = similarDomains;
this.similarDomainsOld = similarDomainsOld;
this.domainQueries = domainQueries; this.domainQueries = domainQueries;
this.blacklist = blacklist; this.blacklist = blacklist;
this.assistantClient = assistantClient;
this.browseResultCleaner = browseResultCleaner; this.browseResultCleaner = browseResultCleaner;
} }
public BrowseResultSet getRandomEntries(int set) { public BrowseResultSet getRandomEntries(int set) {
List<BrowseResult> results = randomDomains.getRandomDomains(25, blacklist, set); List<BrowseResult> results = randomDomains.getRandomDomains(25, blacklist, set);
results.removeIf(browseResultCleaner.shouldRemoveResultPredicate()); results.removeIf(browseResultCleaner.shouldRemoveResultPredicateBr());
return new BrowseResultSet(results); return new BrowseResultSet(results);
} }
public BrowseResultSet getRelatedEntries(String word) { public BrowseResultSet getRelatedEntries(Context ctx, String domainName) {
var domain = domainQueries.getDomainId(new EdgeDomain(word)); var domain = domainQueries.getDomainId(new EdgeDomain(domainName));
var neighbors = similarDomains.getDomainNeighborsAdjacentCosineRequireScreenshot(domain, blacklist, 256); var neighbors = assistantClient.similarDomains(ctx, domain, 50).blockingFirst();
neighbors.removeIf(browseResultCleaner.shouldRemoveResultPredicate()); neighbors.removeIf(sd -> !sd.screenshot());
// If the results are very few, supplement with the alternative shitty algorithm // If the results are very few, supplement with the alternative shitty algorithm
if (neighbors.size() < 25) { if (neighbors.size() < 25) {
Set<BrowseResult> allNeighbors = new HashSet<>(neighbors); Set<SimilarDomain> allNeighbors = new HashSet<>(neighbors);
allNeighbors.addAll(similarDomainsOld.getDomainNeighborsAdjacent(domain, blacklist, 50)); allNeighbors.addAll(assistantClient.linkedDomains(ctx, domain, 50).blockingFirst());
neighbors.clear(); neighbors.clear();
neighbors.addAll(allNeighbors); neighbors.addAll(allNeighbors);
neighbors.removeIf(browseResultCleaner.shouldRemoveResultPredicate()); neighbors.removeIf(sd -> !sd.screenshot());
} }
List<BrowseResult> results = new ArrayList<>(neighbors.size());
for (SimilarDomain sd : neighbors) {
var resultDomain = domainQueries.getDomain(sd.domainId());
if (resultDomain.isEmpty())
continue;
results.add(new BrowseResult(resultDomain.get().toRootUrl(), sd.domainId(), 0, sd.screenshot()));
}
// shuffle the items for a less repetitive experience // shuffle the items for a less repetitive experience
shuffle(neighbors); shuffle(neighbors);
return new BrowseResultSet(neighbors, word); return new BrowseResultSet(results, domainName);
} }
} }

View File

@ -24,6 +24,7 @@ java {
dependencies { dependencies {
implementation project(':third-party:symspell') implementation project(':third-party:symspell')
implementation project(':code:api:assistant-api') implementation project(':code:api:assistant-api')
implementation project(':code:api:query-api')
implementation project(':code:common:config') implementation project(':code:common:config')
implementation project(':code:common:service') implementation project(':code:common:service')
implementation project(':code:common:model') implementation project(':code:common:model')

View File

@ -5,6 +5,7 @@ import nu.marginalia.geoip.GeoIpDictionary;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DbDomainQueries;
import nu.marginalia.assistant.client.model.DomainInformation; import nu.marginalia.assistant.client.model.DomainInformation;
import nu.marginalia.query.client.QueryClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -20,6 +21,7 @@ public class DomainInformationService {
private final GeoIpDictionary geoIpDictionary; private final GeoIpDictionary geoIpDictionary;
private DbDomainQueries dbDomainQueries; private DbDomainQueries dbDomainQueries;
private final QueryClient queryClient;
private HikariDataSource dataSource; private HikariDataSource dataSource;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
@ -27,9 +29,11 @@ public class DomainInformationService {
public DomainInformationService( public DomainInformationService(
DbDomainQueries dbDomainQueries, DbDomainQueries dbDomainQueries,
GeoIpDictionary geoIpDictionary, GeoIpDictionary geoIpDictionary,
QueryClient queryClient,
HikariDataSource dataSource) { HikariDataSource dataSource) {
this.dbDomainQueries = dbDomainQueries; this.dbDomainQueries = dbDomainQueries;
this.geoIpDictionary = geoIpDictionary; this.geoIpDictionary = geoIpDictionary;
this.queryClient = queryClient;
this.dataSource = dataSource; this.dataSource = dataSource;
} }
@ -80,21 +84,8 @@ public class DomainInformationService {
inCrawlQueue = rs.next(); inCrawlQueue = rs.next();
builder.inCrawlQueue(inCrawlQueue); builder.inCrawlQueue(inCrawlQueue);
rs = stmt.executeQuery(STR.""" builder.incomingLinks(queryClient.countLinksToDomain(domainId));
SELECT COUNT(ID) FROM EC_DOMAIN_LINK WHERE DEST_DOMAIN_ID=\{domainId} builder.outboundLinks(queryClient.countLinksFromDomain(domainId));
""");
if (rs.next()) {
builder.incomingLinks(rs.getInt(1));
}
rs = stmt.executeQuery(STR."""
SELECT COUNT(ID) FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=\{domainId}
""");
if (rs.next()) {
builder.outboundLinks(rs.getInt(1));
outboundLinks = rs.getInt(1);
}
rs = stmt.executeQuery(STR.""" rs = stmt.executeQuery(STR."""
SELECT KNOWN_URLS, GOOD_URLS, VISITED_URLS FROM DOMAIN_METADATA WHERE ID=\{domainId} SELECT KNOWN_URLS, GOOD_URLS, VISITED_URLS FROM DOMAIN_METADATA WHERE ID=\{domainId}

View File

@ -10,6 +10,7 @@ import gnu.trove.set.TIntSet;
import gnu.trove.set.hash.TIntHashSet; import gnu.trove.set.hash.TIntHashSet;
import nu.marginalia.assistant.client.model.SimilarDomain; import nu.marginalia.assistant.client.model.SimilarDomain;
import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.query.client.QueryClient;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -25,14 +26,13 @@ public class SimilarDomainsService {
private static final Logger logger = LoggerFactory.getLogger(SimilarDomainsService.class); private static final Logger logger = LoggerFactory.getLogger(SimilarDomainsService.class);
private final HikariDataSource dataSource; private final HikariDataSource dataSource;
private final QueryClient queryClient;
private volatile TIntIntHashMap domainIdToIdx = new TIntIntHashMap(100_000); private volatile TIntIntHashMap domainIdToIdx = new TIntIntHashMap(100_000);
private volatile int[] domainIdxToId; private volatile int[] domainIdxToId;
public volatile TIntDoubleHashMap[] relatedDomains; public volatile TIntDoubleHashMap[] relatedDomains;
public volatile TIntList[] domainNeighbors = null; public volatile TIntList[] domainNeighbors = null;
public volatile TIntList[] linkStoD = null;
public volatile TIntList[] linkDtoS = null;
public volatile BitSet screenshotDomains = null; public volatile BitSet screenshotDomains = null;
public volatile BitSet activeDomains = null; public volatile BitSet activeDomains = null;
public volatile BitSet indexedDomains = null; public volatile BitSet indexedDomains = null;
@ -42,8 +42,9 @@ public class SimilarDomainsService {
volatile boolean isReady = false; volatile boolean isReady = false;
@Inject @Inject
public SimilarDomainsService(HikariDataSource dataSource) { public SimilarDomainsService(HikariDataSource dataSource, QueryClient queryClient) {
this.dataSource = dataSource; this.dataSource = dataSource;
this.queryClient = queryClient;
Executors.newSingleThreadExecutor().submit(this::init); Executors.newSingleThreadExecutor().submit(this::init);
} }
@ -70,8 +71,6 @@ public class SimilarDomainsService {
domainRanks = new double[domainIdToIdx.size()]; domainRanks = new double[domainIdToIdx.size()];
domainNames = new String[domainIdToIdx.size()]; domainNames = new String[domainIdToIdx.size()];
domainNeighbors = new TIntList[domainIdToIdx.size()]; domainNeighbors = new TIntList[domainIdToIdx.size()];
linkStoD = new TIntList[domainIdToIdx.size()];
linkDtoS = new TIntList[domainIdToIdx.size()];
screenshotDomains = new BitSet(domainIdToIdx.size()); screenshotDomains = new BitSet(domainIdToIdx.size());
activeDomains = new BitSet(domainIdToIdx.size()); activeDomains = new BitSet(domainIdToIdx.size());
indexedDomains = new BitSet(domainIdToIdx.size()); indexedDomains = new BitSet(domainIdToIdx.size());
@ -108,27 +107,6 @@ public class SimilarDomainsService {
logger.info("Loaded {} related domains", relatedDomains.length); logger.info("Loaded {} related domains", relatedDomains.length);
rs = stmt.executeQuery("""
SELECT SOURCE_DOMAIN_ID, DEST_DOMAIN_ID FROM EC_DOMAIN_LINK
""");
while (rs.next()) {
int source = rs.getInt(1);
int dest = rs.getInt(2);
int sourceIdx = domainIdToIdx.get(source);
int destIdx = domainIdToIdx.get(dest);
if (linkStoD[sourceIdx] == null)
linkStoD[sourceIdx] = new TIntArrayList(32);
if (linkDtoS[destIdx] == null)
linkDtoS[destIdx] = new TIntArrayList(32);
linkStoD[sourceIdx].add(destIdx);
linkDtoS[destIdx].add(sourceIdx);
}
logger.info("Loaded links...");
rs = stmt.executeQuery(""" rs = stmt.executeQuery("""
SELECT EC_DOMAIN.ID, SELECT EC_DOMAIN.ID,
@ -167,7 +145,6 @@ public class SimilarDomainsService {
} }
logger.info("Loaded {} domains", domainRanks.length); logger.info("Loaded {} domains", domainRanks.length);
logger.info("All done!");
isReady = true; isReady = true;
} }
} }
@ -272,17 +249,23 @@ public class SimilarDomainsService {
} }
private TIntSet getLinkingIdsDToS(int domainIdx) { private TIntSet getLinkingIdsDToS(int domainIdx) {
var items = linkDtoS[domainIdx]; var items = new TIntHashSet();
if (items == null)
return new TIntHashSet(); for (int id : queryClient.getLinksFromDomain(domainIdxToId[domainIdx])) {
return new TIntHashSet(items); items.add(domainIdToIdx.get(id));
}
return items;
} }
private TIntSet getLinkingIdsSToD(int domainIdx) { private TIntSet getLinkingIdsSToD(int domainIdx) {
var items = linkStoD[domainIdx]; var items = new TIntHashSet();
if (items == null)
return new TIntHashSet(); for (int id : queryClient.getLinksToDomain(domainIdxToId[domainIdx])) {
return new TIntHashSet(items); items.add(domainIdToIdx.get(id));
}
return items;
} }
public List<SimilarDomain> getLinkingDomains(int domainId, int count) { public List<SimilarDomain> getLinkingDomains(int domainId, int count) {

View File

@ -26,6 +26,7 @@ dependencies {
implementation project(':code:common:model') implementation project(':code:common:model')
implementation project(':code:common:process') implementation project(':code:common:process')
implementation project(':code:common:db') implementation project(':code:common:db')
implementation project(':code:common:linkdb')
implementation project(':code:common:service') implementation project(':code:common:service')
implementation project(':code:common:service-client') implementation project(':code:common:service-client')

View File

@ -4,9 +4,6 @@ import com.google.gson.Gson;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep; import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;

View File

@ -19,6 +19,9 @@ import java.sql.SQLException;
import java.time.LocalDateTime; import java.time.LocalDateTime;
import java.util.List; import java.util.List;
import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME;
import static nu.marginalia.linkdb.LinkdbFileNames.DOMAIN_LINKS_FILE_NAME;
public class BackupService { public class BackupService {
private final FileStorageService storageService; private final FileStorageService storageService;
@ -26,6 +29,7 @@ public class BackupService {
public enum BackupHeartbeatSteps { public enum BackupHeartbeatSteps {
LINKS, LINKS,
DOCS,
JOURNAL, JOURNAL,
DONE DONE
} }
@ -57,8 +61,11 @@ public class BackupService {
try (var heartbeat = serviceHeartbeat.createServiceTaskHeartbeat(BackupHeartbeatSteps.class, "Backup")) { try (var heartbeat = serviceHeartbeat.createServiceTaskHeartbeat(BackupHeartbeatSteps.class, "Backup")) {
heartbeat.progress(BackupHeartbeatSteps.DOCS);
backupFileCompressed(DOCDB_FILE_NAME, linkdbStagingStorage, backupStorage.asPath());
heartbeat.progress(BackupHeartbeatSteps.LINKS); heartbeat.progress(BackupHeartbeatSteps.LINKS);
backupFileCompressed("links.db", linkdbStagingStorage, backupStorage.asPath()); backupFileCompressed(DOMAIN_LINKS_FILE_NAME, linkdbStagingStorage, backupStorage.asPath());
heartbeat.progress(BackupHeartbeatSteps.JOURNAL); heartbeat.progress(BackupHeartbeatSteps.JOURNAL);
// This file format is already compressed // This file format is already compressed
@ -79,8 +86,11 @@ public class BackupService {
var linkdbStagingStorage = IndexLocations.getLinkdbWritePath(storageService); var linkdbStagingStorage = IndexLocations.getLinkdbWritePath(storageService);
try (var heartbeat = serviceHeartbeat.createServiceTaskHeartbeat(BackupHeartbeatSteps.class, "Restore Backup")) { try (var heartbeat = serviceHeartbeat.createServiceTaskHeartbeat(BackupHeartbeatSteps.class, "Restore Backup")) {
heartbeat.progress(BackupHeartbeatSteps.DOCS);
restoreBackupCompressed(DOCDB_FILE_NAME, linkdbStagingStorage, backupStorage);
heartbeat.progress(BackupHeartbeatSteps.LINKS); heartbeat.progress(BackupHeartbeatSteps.LINKS);
restoreBackupCompressed("links.db", linkdbStagingStorage, backupStorage); restoreBackupCompressed(DOMAIN_LINKS_FILE_NAME, linkdbStagingStorage, backupStorage);
heartbeat.progress(BackupHeartbeatSteps.JOURNAL); heartbeat.progress(BackupHeartbeatSteps.JOURNAL);
restoreJournal(indexStagingStorage, backupStorage); restoreJournal(indexStagingStorage, backupStorage);

View File

@ -4,17 +4,27 @@ import com.google.inject.AbstractModule;
import com.google.inject.Provides; import com.google.inject.Provides;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import com.google.inject.name.Named; import com.google.inject.name.Named;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.linkdb.DomainLinkDb;
import nu.marginalia.linkdb.FileDomainLinkDb;
import nu.marginalia.linkdb.SqlDomainLinkDb;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.IndexLocations; import nu.marginalia.IndexLocations;
import nu.marginalia.index.config.RankingSettings; import nu.marginalia.index.config.RankingSettings;
import nu.marginalia.WmsaHome; import nu.marginalia.WmsaHome;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.sql.SQLException; import java.sql.SQLException;
import static nu.marginalia.linkdb.LinkdbFileNames.*;
public class IndexModule extends AbstractModule { public class IndexModule extends AbstractModule {
private static final Logger logger = LoggerFactory.getLogger(IndexModule.class);
public void configure() { public void configure() {
} }
@ -25,11 +35,49 @@ public class IndexModule extends AbstractModule {
return RankingSettings.from(dir); return RankingSettings.from(dir);
} }
@Provides
@Singleton
public DomainLinkDb domainLinkDb (
FileStorageService storageService,
HikariDataSource dataSource,
ServiceConfiguration serviceConfiguration
) throws IOException
{
Path path = IndexLocations.getLinkdbLivePath(storageService).resolve(DOMAIN_LINKS_FILE_NAME);
if (Files.exists(path)) {
logger.info("Using file domain link db {}", path);
return new FileDomainLinkDb(path);
}
else {
logger.warn("Using legacy sql domain link db");
return new SqlDomainLinkDb(path, dataSource, serviceConfiguration);
}
}
@Provides @Provides
@Singleton @Singleton
@Named("linkdb-file") @Named("docdb-file")
public Path linkdbPath(FileStorageService storageService) throws SQLException { public Path linkdbPath(FileStorageService storageService) throws IOException {
return IndexLocations.getLinkdbLivePath(storageService).resolve("links.db"); // Migrate from old location
Path migrationMarker = IndexLocations.getLinkdbLivePath(storageService).resolve("migrated-links.db-to-documents.db");
Path oldPath = IndexLocations.getLinkdbLivePath(storageService).resolve(DEPRECATED_LINKDB_FILE_NAME);
Path newPath = IndexLocations.getLinkdbLivePath(storageService).resolve(DOCDB_FILE_NAME);
if (Files.exists(oldPath) && !Files.exists(newPath) && !Files.exists(migrationMarker)) {
logger.info("Migrating {} to {}", oldPath, newPath);
Files.move(oldPath, newPath);
Files.createFile(migrationMarker);
}
return newPath;
}
@Provides
@Singleton
@Named("domain-linkdb-file")
public Path domainLinkDbFile(FileStorageService storageService) throws SQLException {
return IndexLocations.getLinkdbLivePath(storageService).resolve(DOMAIN_LINKS_FILE_NAME);
} }
} }

View File

@ -6,12 +6,14 @@ import io.grpc.ServerBuilder;
import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import nu.marginalia.IndexLocations; import nu.marginalia.IndexLocations;
import nu.marginalia.index.svc.IndexDomainLinksService;
import nu.marginalia.linkdb.DomainLinkDb;
import nu.marginalia.storage.FileStorageService; import nu.marginalia.storage.FileStorageService;
import nu.marginalia.index.client.IndexMqEndpoints; import nu.marginalia.index.client.IndexMqEndpoints;
import nu.marginalia.index.index.SearchIndex; import nu.marginalia.index.index.SearchIndex;
import nu.marginalia.index.svc.IndexOpsService; import nu.marginalia.index.svc.IndexOpsService;
import nu.marginalia.index.svc.IndexQueryService; import nu.marginalia.index.svc.IndexQueryService;
import nu.marginalia.linkdb.LinkdbReader; import nu.marginalia.linkdb.DocumentDbReader;
import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.server.*; import nu.marginalia.service.server.*;
@ -28,6 +30,8 @@ import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME;
import static nu.marginalia.linkdb.LinkdbFileNames.DOMAIN_LINKS_FILE_NAME;
import static spark.Spark.get; import static spark.Spark.get;
public class IndexService extends Service { public class IndexService extends Service {
@ -38,8 +42,9 @@ public class IndexService extends Service {
private final IndexOpsService opsService; private final IndexOpsService opsService;
private final SearchIndex searchIndex; private final SearchIndex searchIndex;
private final FileStorageService fileStorageService; private final FileStorageService fileStorageService;
private final LinkdbReader linkdbReader; private final DocumentDbReader documentDbReader;
private final DomainLinkDb domainLinkDb;
private final ServiceEventLog eventLog; private final ServiceEventLog eventLog;
@ -49,14 +54,17 @@ public class IndexService extends Service {
IndexQueryService indexQueryService, IndexQueryService indexQueryService,
SearchIndex searchIndex, SearchIndex searchIndex,
FileStorageService fileStorageService, FileStorageService fileStorageService,
LinkdbReader linkdbReader, DocumentDbReader documentDbReader,
DomainLinkDb domainLinkDb,
IndexDomainLinksService indexDomainLinksService,
ServiceEventLog eventLog) throws IOException { ServiceEventLog eventLog) throws IOException {
super(params); super(params);
this.opsService = opsService; this.opsService = opsService;
this.searchIndex = searchIndex; this.searchIndex = searchIndex;
this.fileStorageService = fileStorageService; this.fileStorageService = fileStorageService;
this.linkdbReader = linkdbReader; this.documentDbReader = documentDbReader;
this.domainLinkDb = domainLinkDb;
this.eventLog = eventLog; this.eventLog = eventLog;
final Gson gson = GsonFactory.get(); final Gson gson = GsonFactory.get();
@ -65,6 +73,7 @@ public class IndexService extends Service {
var grpcServer = ServerBuilder.forPort(params.configuration.port() + 1) var grpcServer = ServerBuilder.forPort(params.configuration.port() + 1)
.addService(indexQueryService) .addService(indexQueryService)
.addService(indexDomainLinksService)
.build(); .build();
grpcServer.start(); grpcServer.start();
@ -99,15 +108,24 @@ public class IndexService extends Service {
@SneakyThrows @SneakyThrows
@MqRequest(endpoint = IndexMqEndpoints.SWITCH_LINKDB) @MqRequest(endpoint = IndexMqEndpoints.SWITCH_LINKDB)
public void switchLinkdb(String unusedArg) { public void switchLinkdb(String unusedArg) {
logger.info("Switching link database"); logger.info("Switching link databases");
Path newPath = IndexLocations Path newPathDocs = IndexLocations
.getLinkdbWritePath(fileStorageService) .getLinkdbWritePath(fileStorageService)
.resolve("links.db"); .resolve(DOCDB_FILE_NAME);
if (Files.exists(newPath)) { if (Files.exists(newPathDocs)) {
eventLog.logEvent("INDEX-SWITCH-LINKDB", ""); eventLog.logEvent("INDEX-SWITCH-DOCKDB", "");
linkdbReader.switchInput(newPath); documentDbReader.switchInput(newPathDocs);
}
Path newPathDomains = IndexLocations
.getLinkdbWritePath(fileStorageService)
.resolve(DOMAIN_LINKS_FILE_NAME);
if (Files.exists(newPathDomains)) {
eventLog.logEvent("INDEX-SWITCH-DOMAIN-LINKDB", "");
domainLinkDb.switchInput(newPathDomains);
} }
} }

View File

@ -7,8 +7,8 @@ import gnu.trove.list.array.TLongArrayList;
import nu.marginalia.index.client.model.results.DecoratedSearchResultItem; import nu.marginalia.index.client.model.results.DecoratedSearchResultItem;
import nu.marginalia.index.client.model.results.ResultRankingContext; import nu.marginalia.index.client.model.results.ResultRankingContext;
import nu.marginalia.index.client.model.results.SearchResultItem; import nu.marginalia.index.client.model.results.SearchResultItem;
import nu.marginalia.linkdb.LinkdbReader; import nu.marginalia.linkdb.DocumentDbReader;
import nu.marginalia.linkdb.model.LdbUrlDetail; import nu.marginalia.linkdb.model.DocdbUrlDetail;
import nu.marginalia.ranking.ResultValuator; import nu.marginalia.ranking.ResultValuator;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -25,13 +25,13 @@ public class IndexResultDecorator {
private static final Logger logger = LoggerFactory.getLogger(IndexResultDecorator.class); private static final Logger logger = LoggerFactory.getLogger(IndexResultDecorator.class);
private final LinkdbReader linkdbReader; private final DocumentDbReader documentDbReader;
private final ResultValuator valuator; private final ResultValuator valuator;
@Inject @Inject
public IndexResultDecorator(LinkdbReader linkdbReader, public IndexResultDecorator(DocumentDbReader documentDbReader,
ResultValuator valuator) { ResultValuator valuator) {
this.linkdbReader = linkdbReader; this.documentDbReader = documentDbReader;
this.valuator = valuator; this.valuator = valuator;
} }
@ -46,9 +46,9 @@ public class IndexResultDecorator {
for (var result : rawResults) for (var result : rawResults)
idsList.add(result.getDocumentId()); idsList.add(result.getDocumentId());
Map<Long, LdbUrlDetail> urlDetailsById = new HashMap<>(rawResults.size()); Map<Long, DocdbUrlDetail> urlDetailsById = new HashMap<>(rawResults.size());
for (var item : linkdbReader.getUrlDetails(idsList)) for (var item : documentDbReader.getUrlDetails(idsList))
urlDetailsById.put(item.urlId(), item); urlDetailsById.put(item.urlId(), item);
List<DecoratedSearchResultItem> decoratedItems = new ArrayList<>(); List<DecoratedSearchResultItem> decoratedItems = new ArrayList<>();
@ -63,7 +63,7 @@ public class IndexResultDecorator {
} }
private DecoratedSearchResultItem createCombinedItem(SearchResultItem result, private DecoratedSearchResultItem createCombinedItem(SearchResultItem result,
LdbUrlDetail linkData, DocdbUrlDetail linkData,
ResultRankingContext rankingContext) { ResultRankingContext rankingContext) {
return new DecoratedSearchResultItem( return new DecoratedSearchResultItem(
result, result,

View File

@ -0,0 +1,104 @@
package nu.marginalia.index.svc;
import com.google.inject.Inject;
import io.grpc.stub.StreamObserver;
import nu.marginalia.index.api.*;
import nu.marginalia.linkdb.DomainLinkDb;
import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall;
/** GRPC service for interrogating domain links
*/
public class IndexDomainLinksService extends IndexDomainLinksApiGrpc.IndexDomainLinksApiImplBase {
private final DomainLinkDb domainLinkDb;
@Inject
public IndexDomainLinksService(DomainLinkDb domainLinkDb) {
this.domainLinkDb = domainLinkDb;
}
public void getAllLinks(nu.marginalia.index.api.Empty request,
io.grpc.stub.StreamObserver<nu.marginalia.index.api.RpcDomainIdPairs> responseObserver) {
try (var idsConverter = new AllIdsResponseConverter(responseObserver)) {
domainLinkDb.forEach(idsConverter::accept);
}
responseObserver.onCompleted();
}
private static class AllIdsResponseConverter implements AutoCloseable {
private RpcDomainIdPairs.Builder builder;
private final io.grpc.stub.StreamObserver<RpcDomainIdPairs> responseObserver;
private int n = 0;
private AllIdsResponseConverter(io.grpc.stub.StreamObserver<RpcDomainIdPairs> responseObserver) {
this.responseObserver = responseObserver;
this.builder = RpcDomainIdPairs.newBuilder();
}
public void accept(int source, int dest) {
builder.addSourceIds(source);
builder.addDestIds(dest);
if (++n > 1000) {
responseObserver.onNext(builder.build());
builder = RpcDomainIdPairs.newBuilder();
n = 0;
}
}
@Override
public void close() {
if (n > 0) {
responseObserver.onNext(builder.build());
}
}
}
@Override
public void getLinksFromDomain(RpcDomainId request,
StreamObserver<RpcDomainIdList> responseObserver) {
var links = domainLinkDb.findDestinations(request.getDomainId());
var rspBuilder = RpcDomainIdList.newBuilder();
for (int i = 0; i < links.size(); i++) {
rspBuilder.addDomainId(links.get(i));
}
responseObserver.onNext(rspBuilder.build());
responseObserver.onCompleted();
}
@Override
public void getLinksToDomain(RpcDomainId request,
StreamObserver<RpcDomainIdList> responseObserver) {
var links = domainLinkDb.findSources(request.getDomainId());
var rspBuilder = RpcDomainIdList.newBuilder();
for (int i = 0; i < links.size(); i++) {
rspBuilder.addDomainId(links.get(i));
}
responseObserver.onNext(rspBuilder.build());
responseObserver.onCompleted();
}
public void countLinksFromDomain(RpcDomainId request,
StreamObserver<RpcDomainIdCount> responseObserver) {
responseObserver.onNext(RpcDomainIdCount.newBuilder()
.setIdCount(domainLinkDb.countDestinations(request.getDomainId()))
.build());
responseObserver.onCompleted();
}
public void countLinksToDomain(RpcDomainId request,
StreamObserver<RpcDomainIdCount> responseObserver) {
responseObserver.onNext(RpcDomainIdCount.newBuilder()
.setIdCount(domainLinkDb.countSources(request.getDomainId()))
.build());
responseObserver.onCompleted();
}
}

View File

@ -59,6 +59,7 @@ public class IndexOpsService {
public <T> Optional<T> run(Callable<T> c) throws Exception { public <T> Optional<T> run(Callable<T> c) throws Exception {
if (!opsLock.tryLock()) if (!opsLock.tryLock())
return Optional.empty(); return Optional.empty();
try { try {
return Optional.of(c.call()); return Optional.of(c.call());
} }

View File

@ -24,9 +24,9 @@ import nu.marginalia.index.journal.writer.IndexJournalWriter;
import nu.marginalia.index.query.limit.QueryLimits; import nu.marginalia.index.query.limit.QueryLimits;
import nu.marginalia.index.query.limit.QueryStrategy; import nu.marginalia.index.query.limit.QueryStrategy;
import nu.marginalia.index.query.limit.SpecificationLimit; import nu.marginalia.index.query.limit.SpecificationLimit;
import nu.marginalia.linkdb.LinkdbReader; import nu.marginalia.linkdb.DocumentDbReader;
import nu.marginalia.linkdb.LinkdbWriter; import nu.marginalia.linkdb.DocumentDbWriter;
import nu.marginalia.linkdb.model.LdbUrlDetail; import nu.marginalia.linkdb.model.DocdbUrlDetail;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.id.UrlIdCodec; import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.model.idx.WordFlags; import nu.marginalia.model.idx.WordFlags;
@ -51,6 +51,7 @@ import java.sql.SQLException;
import java.util.*; import java.util.*;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
@ -80,7 +81,7 @@ public class IndexQueryServiceIntegrationSmokeTest {
DomainRankings domainRankings; DomainRankings domainRankings;
@Inject @Inject
LinkdbReader linkdbReader; DocumentDbReader documentDbReader;
@Inject @Inject
ProcessHeartbeat processHeartbeat; ProcessHeartbeat processHeartbeat;
@ -103,15 +104,15 @@ public class IndexQueryServiceIntegrationSmokeTest {
@Test @Test
public void willItBlend() throws Exception { public void willItBlend() throws Exception {
var linkdbWriter = new LinkdbWriter( var linkdbWriter = new DocumentDbWriter(
IndexLocations.getLinkdbLivePath(fileStorageService) IndexLocations.getLinkdbLivePath(fileStorageService)
.resolve("links.db") .resolve(DOCDB_FILE_NAME)
); );
for (int i = 1; i < 512; i++) { for (int i = 1; i < 512; i++) {
loadData(linkdbWriter, i); loadData(linkdbWriter, i);
} }
linkdbWriter.close(); linkdbWriter.close();
linkdbReader.reconnect(); documentDbReader.reconnect();
indexJournalWriter.close(); indexJournalWriter.close();
constructIndex(); constructIndex();
@ -146,15 +147,15 @@ public class IndexQueryServiceIntegrationSmokeTest {
@Test @Test
public void testDomainQuery() throws Exception { public void testDomainQuery() throws Exception {
var linkdbWriter = new LinkdbWriter( var linkdbWriter = new DocumentDbWriter(
IndexLocations.getLinkdbLivePath(fileStorageService) IndexLocations.getLinkdbLivePath(fileStorageService)
.resolve("links.db") .resolve(DOCDB_FILE_NAME)
); );
for (int i = 1; i < 512; i++) { for (int i = 1; i < 512; i++) {
loadDataWithDomain(linkdbWriter, i/100, i); loadDataWithDomain(linkdbWriter, i/100, i);
} }
linkdbWriter.close(); linkdbWriter.close();
linkdbReader.reconnect(); documentDbReader.reconnect();
indexJournalWriter.close(); indexJournalWriter.close();
constructIndex(); constructIndex();
@ -183,15 +184,15 @@ public class IndexQueryServiceIntegrationSmokeTest {
@Test @Test
public void testYearQuery() throws Exception { public void testYearQuery() throws Exception {
var linkdbWriter = new LinkdbWriter( var linkdbWriter = new DocumentDbWriter(
IndexLocations.getLinkdbLivePath(fileStorageService) IndexLocations.getLinkdbLivePath(fileStorageService)
.resolve("links.db") .resolve(DOCDB_FILE_NAME)
); );
for (int i = 1; i < 512; i++) { for (int i = 1; i < 512; i++) {
loadData(linkdbWriter, i); loadData(linkdbWriter, i);
} }
linkdbWriter.close(); linkdbWriter.close();
linkdbReader.reconnect(); documentDbReader.reconnect();
indexJournalWriter.close(); indexJournalWriter.close();
constructIndex(); constructIndex();
@ -283,7 +284,7 @@ public class IndexQueryServiceIntegrationSmokeTest {
MurmurHash3_128 hasher = new MurmurHash3_128(); MurmurHash3_128 hasher = new MurmurHash3_128();
@SneakyThrows @SneakyThrows
public void loadData(LinkdbWriter ldbw, int id) { public void loadData(DocumentDbWriter ldbw, int id) {
int[] factors = IntStream int[] factors = IntStream
.rangeClosed(1, id) .rangeClosed(1, id)
.filter(v -> (id % v) == 0) .filter(v -> (id % v) == 0)
@ -299,7 +300,7 @@ public class IndexQueryServiceIntegrationSmokeTest {
data[2 * i + 1] = new WordMetadata(i, EnumSet.of(WordFlags.Title)).encode(); data[2 * i + 1] = new WordMetadata(i, EnumSet.of(WordFlags.Title)).encode();
} }
ldbw.add(new LdbUrlDetail( ldbw.add(new DocdbUrlDetail(
fullId, new EdgeUrl("https://www.example.com/"+id), fullId, new EdgeUrl("https://www.example.com/"+id),
"test", "test", 0., "HTML5", 0, null, 0, 10 "test", "test", 0., "HTML5", 0, null, 0, 10
)); ));
@ -308,7 +309,7 @@ public class IndexQueryServiceIntegrationSmokeTest {
} }
@SneakyThrows @SneakyThrows
public void loadDataWithDomain(LinkdbWriter ldbw, int domain, int id) { public void loadDataWithDomain(DocumentDbWriter ldbw, int domain, int id) {
int[] factors = IntStream.rangeClosed(1, id).filter(v -> (id % v) == 0).toArray(); int[] factors = IntStream.rangeClosed(1, id).filter(v -> (id % v) == 0).toArray();
long fullId = UrlIdCodec.encodeId(domain, id); long fullId = UrlIdCodec.encodeId(domain, id);
var header = new IndexJournalEntryHeader(factors.length, 0, fullId, DocumentMetadata.defaultValue()); var header = new IndexJournalEntryHeader(factors.length, 0, fullId, DocumentMetadata.defaultValue());
@ -319,7 +320,7 @@ public class IndexQueryServiceIntegrationSmokeTest {
data[2*i + 1] = new WordMetadata(i, EnumSet.of(WordFlags.Title)).encode(); data[2*i + 1] = new WordMetadata(i, EnumSet.of(WordFlags.Title)).encode();
} }
ldbw.add(new LdbUrlDetail( ldbw.add(new DocdbUrlDetail(
fullId, new EdgeUrl("https://www.example.com/"+id), fullId, new EdgeUrl("https://www.example.com/"+id),
"test", "test", 0., "HTML5", 0, null, 0, 10 "test", "test", 0., "HTML5", 0, null, 0, 10
)); ));

View File

@ -23,9 +23,9 @@ import nu.marginalia.index.journal.writer.IndexJournalWriter;
import nu.marginalia.index.query.limit.QueryLimits; import nu.marginalia.index.query.limit.QueryLimits;
import nu.marginalia.index.query.limit.QueryStrategy; import nu.marginalia.index.query.limit.QueryStrategy;
import nu.marginalia.index.query.limit.SpecificationLimit; import nu.marginalia.index.query.limit.SpecificationLimit;
import nu.marginalia.linkdb.LinkdbReader; import nu.marginalia.linkdb.DocumentDbReader;
import nu.marginalia.linkdb.LinkdbWriter; import nu.marginalia.linkdb.DocumentDbWriter;
import nu.marginalia.linkdb.model.LdbUrlDetail; import nu.marginalia.linkdb.model.DocdbUrlDetail;
import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawl.PubDate; import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.model.id.UrlIdCodec; import nu.marginalia.model.id.UrlIdCodec;
@ -53,6 +53,7 @@ import java.sql.SQLException;
import java.util.*; import java.util.*;
import java.util.function.Function; import java.util.function.Function;
import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
@ -84,7 +85,7 @@ public class IndexQueryServiceIntegrationTest {
@Inject @Inject
ProcessHeartbeat processHeartbeat; ProcessHeartbeat processHeartbeat;
@Inject @Inject
LinkdbReader linkdbReader; DocumentDbReader documentDbReader;
@BeforeEach @BeforeEach
public void setUp() throws IOException { public void setUp() throws IOException {
@ -566,11 +567,11 @@ public class IndexQueryServiceIntegrationTest {
indexJournalWriter.put(header, entry); indexJournalWriter.put(header, entry);
}); });
var linkdbWriter = new LinkdbWriter( var linkdbWriter = new DocumentDbWriter(
IndexLocations.getLinkdbLivePath(fileStorageService).resolve("links.db") IndexLocations.getLinkdbLivePath(fileStorageService).resolve(DOCDB_FILE_NAME)
); );
for (Long key : allData.keySet()) { for (Long key : allData.keySet()) {
linkdbWriter.add(new LdbUrlDetail( linkdbWriter.add(new DocdbUrlDetail(
key, key,
new EdgeUrl("https://www.example.com"), new EdgeUrl("https://www.example.com"),
"test", "test",
@ -587,7 +588,7 @@ public class IndexQueryServiceIntegrationTest {
indexJournalWriter.close(); indexJournalWriter.close();
constructIndex(); constructIndex();
linkdbReader.reconnect(); documentDbReader.reconnect();
searchIndex.switchIndex(); searchIndex.switchIndex();
} }
} }

View File

@ -7,7 +7,7 @@ import nu.marginalia.storage.model.FileStorageBase;
import nu.marginalia.storage.model.FileStorageBaseType; import nu.marginalia.storage.model.FileStorageBaseType;
import nu.marginalia.index.journal.writer.IndexJournalWriter; import nu.marginalia.index.journal.writer.IndexJournalWriter;
import nu.marginalia.index.journal.writer.IndexJournalWriterPagingImpl; import nu.marginalia.index.journal.writer.IndexJournalWriterPagingImpl;
import nu.marginalia.linkdb.LinkdbReader; import nu.marginalia.linkdb.DocumentDbReader;
import nu.marginalia.process.control.FakeProcessHeartbeat; import nu.marginalia.process.control.FakeProcessHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.ranking.DomainRankings; import nu.marginalia.ranking.DomainRankings;
@ -26,6 +26,7 @@ import java.sql.SQLException;
import java.util.Random; import java.util.Random;
import java.util.UUID; import java.util.UUID;
import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
public class IndexQueryServiceIntegrationTestModule extends AbstractModule { public class IndexQueryServiceIntegrationTestModule extends AbstractModule {
@ -57,9 +58,9 @@ public class IndexQueryServiceIntegrationTestModule extends AbstractModule {
Mockito.when(fileStorageServiceMock.getStorageBase(FileStorageBaseType.CURRENT)).thenReturn(new FileStorageBase(null, null, 0,null, fastDir.toString())); Mockito.when(fileStorageServiceMock.getStorageBase(FileStorageBaseType.CURRENT)).thenReturn(new FileStorageBase(null, null, 0,null, fastDir.toString()));
Mockito.when(fileStorageServiceMock.getStorageBase(FileStorageBaseType.STORAGE)).thenReturn(new FileStorageBase(null, null, 0, null, fastDir.toString())); Mockito.when(fileStorageServiceMock.getStorageBase(FileStorageBaseType.STORAGE)).thenReturn(new FileStorageBase(null, null, 0, null, fastDir.toString()));
bind(LinkdbReader.class).toInstance(new LinkdbReader( bind(DocumentDbReader.class).toInstance(new DocumentDbReader(
IndexLocations.getLinkdbLivePath(fileStorageServiceMock) IndexLocations.getLinkdbLivePath(fileStorageServiceMock)
.resolve("links.db") .resolve(DOCDB_FILE_NAME)
)); ));
bind(FileStorageService.class).toInstance(fileStorageServiceMock); bind(FileStorageService.class).toInstance(fileStorageServiceMock);

View File

@ -0,0 +1,96 @@
package nu.marginalia.query;
import com.google.inject.Inject;
import io.grpc.ManagedChannel;
import io.grpc.stub.StreamObserver;
import nu.marginalia.index.api.IndexDomainLinksApiGrpc;
import nu.marginalia.index.api.RpcDomainIdCount;
import nu.marginalia.index.api.RpcDomainIdList;
import nu.marginalia.index.api.RpcDomainIdPairs;
import nu.marginalia.query.svc.NodeConfigurationWatcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class QueryGRPCDomainLinksService extends IndexDomainLinksApiGrpc.IndexDomainLinksApiImplBase {
private static final Logger logger = LoggerFactory.getLogger(QueryGRPCDomainLinksService.class);
private final NodeConfigurationWatcher nodeConfigurationWatcher;
private final QueryGrpcStubPool<IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub> stubPool;
@Inject
public QueryGRPCDomainLinksService(NodeConfigurationWatcher nodeConfigurationWatcher) {
this.nodeConfigurationWatcher = nodeConfigurationWatcher;
stubPool = new QueryGrpcStubPool<>(nodeConfigurationWatcher) {
@Override
public IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub createStub(ManagedChannel channel) {
return IndexDomainLinksApiGrpc.newBlockingStub(channel);
}
};
}
@Override
public void getAllLinks(nu.marginalia.index.api.Empty request,
StreamObserver<RpcDomainIdPairs> responseObserver) {
stubPool.callEachSequential(stub -> stub.getAllLinks(request))
.forEach(
iter -> iter.forEachRemaining(responseObserver::onNext)
);
responseObserver.onCompleted();
}
@Override
public void getLinksFromDomain(nu.marginalia.index.api.RpcDomainId request,
StreamObserver<RpcDomainIdList> responseObserver) {
var rspBuilder = RpcDomainIdList.newBuilder();
stubPool.callEachSequential(stub -> stub.getLinksFromDomain(request))
.map(RpcDomainIdList::getDomainIdList)
.forEach(rspBuilder::addAllDomainId);
responseObserver.onNext(rspBuilder.build());
responseObserver.onCompleted();
}
@Override
public void getLinksToDomain(nu.marginalia.index.api.RpcDomainId request,
StreamObserver<RpcDomainIdList> responseObserver) {
var rspBuilder = RpcDomainIdList.newBuilder();
stubPool.callEachSequential(stub -> stub.getLinksToDomain(request))
.map(RpcDomainIdList::getDomainIdList)
.forEach(rspBuilder::addAllDomainId);
responseObserver.onNext(rspBuilder.build());
responseObserver.onCompleted();
}
@Override
public void countLinksFromDomain(nu.marginalia.index.api.RpcDomainId request,
StreamObserver<RpcDomainIdCount> responseObserver) {
int sum = stubPool.callEachSequential(stub -> stub.countLinksFromDomain(request))
.mapToInt(RpcDomainIdCount::getIdCount)
.sum();
var rspBuilder = RpcDomainIdCount.newBuilder();
rspBuilder.setIdCount(sum);
responseObserver.onNext(rspBuilder.build());
responseObserver.onCompleted();
}
@Override
public void countLinksToDomain(nu.marginalia.index.api.RpcDomainId request,
io.grpc.stub.StreamObserver<nu.marginalia.index.api.RpcDomainIdCount> responseObserver) {
int sum = stubPool.callEachSequential(stub -> stub.countLinksToDomain(request))
.mapToInt(RpcDomainIdCount::getIdCount)
.sum();
var rspBuilder = RpcDomainIdCount.newBuilder();
rspBuilder.setIdCount(sum);
responseObserver.onNext(rspBuilder.build());
responseObserver.onCompleted();
}
}

View File

@ -2,7 +2,6 @@ package nu.marginalia.query;
import com.google.inject.Inject; import com.google.inject.Inject;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.prometheus.client.Histogram; import io.prometheus.client.Histogram;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import nu.marginalia.db.DomainBlacklist; import nu.marginalia.db.DomainBlacklist;
@ -10,7 +9,6 @@ import nu.marginalia.index.api.*;
import nu.marginalia.model.id.UrlIdCodec; import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.query.svc.NodeConfigurationWatcher; import nu.marginalia.query.svc.NodeConfigurationWatcher;
import nu.marginalia.query.svc.QueryFactory; import nu.marginalia.query.svc.QueryFactory;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -28,32 +26,7 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
.help("QS-side query time (GRPC endpoint)") .help("QS-side query time (GRPC endpoint)")
.register(); .register();
private final Map<ServiceAndNode, ManagedChannel> channels private final QueryGrpcStubPool<IndexApiGrpc.IndexApiBlockingStub> stubPool;
= new ConcurrentHashMap<>();
private final Map<ServiceAndNode, IndexApiGrpc.IndexApiBlockingStub> actorRpcApis
= new ConcurrentHashMap<>();
private ManagedChannel getChannel(ServiceAndNode serviceAndNode) {
return channels.computeIfAbsent(serviceAndNode,
san -> ManagedChannelBuilder
.forAddress(serviceAndNode.getHostName(), 81)
.usePlaintext()
.build());
}
public IndexApiGrpc.IndexApiBlockingStub indexApi(int node) {
return actorRpcApis.computeIfAbsent(new ServiceAndNode("index-service", node), n ->
IndexApiGrpc.newBlockingStub(
getChannel(n)
)
);
}
record ServiceAndNode(String service, int node) {
public String getHostName() {
return service+"-"+node;
}
}
private final QueryFactory queryFactory; private final QueryFactory queryFactory;
private final DomainBlacklist blacklist; private final DomainBlacklist blacklist;
@ -64,6 +37,13 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
this.queryFactory = queryFactory; this.queryFactory = queryFactory;
this.blacklist = blacklist; this.blacklist = blacklist;
this.nodeConfigurationWatcher = nodeConfigurationWatcher; this.nodeConfigurationWatcher = nodeConfigurationWatcher;
stubPool = new QueryGrpcStubPool<>(nodeConfigurationWatcher) {
@Override
public IndexApiGrpc.IndexApiBlockingStub createStub(ManagedChannel channel) {
return IndexApiGrpc.newBlockingStub(channel);
}
};
} }
public void query(nu.marginalia.index.api.RpcQsQuery request, public void query(nu.marginalia.index.api.RpcQsQuery request,
@ -89,7 +69,6 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
responseBuilder.setDomain(query.domain); responseBuilder.setDomain(query.domain);
responseObserver.onNext(responseBuilder.build()); responseObserver.onNext(responseBuilder.build());
responseObserver.onCompleted(); responseObserver.onCompleted();
}); });
} catch (Exception e) { } catch (Exception e) {
@ -98,16 +77,13 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
} }
} }
private final ExecutorService es = Executors.newVirtualThreadPerTaskExecutor();
private static final Comparator<RpcDecoratedResultItem> comparator = private static final Comparator<RpcDecoratedResultItem> comparator =
Comparator.comparing(RpcDecoratedResultItem::getRankingScore); Comparator.comparing(RpcDecoratedResultItem::getRankingScore);
@SneakyThrows @SneakyThrows
private List<RpcDecoratedResultItem> executeQueries(RpcIndexQuery indexRequest, int totalSize) { private List<RpcDecoratedResultItem> executeQueries(RpcIndexQuery indexRequest, int totalSize) {
List<Callable<List<RpcDecoratedResultItem>>> tasks = createTasks(indexRequest); return stubPool.invokeAll(stub -> new QueryTask(stub, indexRequest))
.stream()
return es.invokeAll(tasks).stream()
.filter(f -> f.state() == Future.State.SUCCESS) .filter(f -> f.state() == Future.State.SUCCESS)
.map(Future::resultNow) .map(Future::resultNow)
.flatMap(List::stream) .flatMap(List::stream)
@ -116,26 +92,30 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase {
.toList(); .toList();
} }
@NotNull private class QueryTask implements Callable<List<RpcDecoratedResultItem>> {
private List<Callable<List<RpcDecoratedResultItem>>> createTasks(RpcIndexQuery indexRequest) { private final IndexApiGrpc.IndexApiBlockingStub stub;
List<Callable<List<RpcDecoratedResultItem>>> tasks = new ArrayList<>(); private final RpcIndexQuery indexRequest;
for (var node : nodeConfigurationWatcher.getQueryNodes()) { public QueryTask(IndexApiGrpc.IndexApiBlockingStub stub, RpcIndexQuery indexRequest) {
tasks.add(() -> { this.stub = stub;
var responseIter = indexApi(node).query(indexRequest); this.indexRequest = indexRequest;
var ret = new ArrayList<RpcDecoratedResultItem>();
while (responseIter.hasNext()) {
RpcDecoratedResultItem next = responseIter.next();
if (isBlacklisted(next))
continue;
ret.add(next);
}
return ret;
});
} }
return tasks;
}
@Override
public List<RpcDecoratedResultItem> call() {
var rsp = stub.query(indexRequest);
List<RpcDecoratedResultItem> ret = new ArrayList<>();
while (rsp.hasNext()) {
RpcDecoratedResultItem next = rsp.next();
if (isBlacklisted(next))
continue;
ret.add(next);
}
return ret;
}
}
private boolean isBlacklisted(RpcDecoratedResultItem item) { private boolean isBlacklisted(RpcDecoratedResultItem item) {
return blacklist.isBlacklisted(UrlIdCodec.getDomainId(item.getRawItem().getCombinedId())); return blacklist.isBlacklisted(UrlIdCodec.getDomainId(item.getRawItem().getCombinedId()));

View File

@ -0,0 +1,64 @@
package nu.marginalia.query;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import nu.marginalia.query.svc.NodeConfigurationWatcher;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Stream;
public abstract class QueryGrpcStubPool<STUB> {
protected record ServiceAndNode(String service, int node) {
public String getHostName() {
return service+"-"+node;
}
}
private final NodeConfigurationWatcher nodeConfigurationWatcher;
private final Map<ServiceAndNode, ManagedChannel> channels = new ConcurrentHashMap<>();
private final Map<ServiceAndNode, STUB> actorRpcApis = new ConcurrentHashMap<>();
private final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor();
QueryGrpcStubPool(NodeConfigurationWatcher nodeConfigurationWatcher) {
this.nodeConfigurationWatcher = nodeConfigurationWatcher;
}
/** Get an API stub for the given node */
public STUB indexApi(int node) {
var san = new ServiceAndNode("index-service", node);
return actorRpcApis.computeIfAbsent(san, n ->
createStub(channels.computeIfAbsent(san, this::createChannel))
);
}
protected ManagedChannel createChannel(ServiceAndNode serviceAndNode) {
return ManagedChannelBuilder.forAddress(serviceAndNode.getHostName(), 81).usePlaintext().build();
}
/** Invoke a function on each node, returning a list of futures in a terminal state, as per
* ExecutorService$invokeAll */
public <T> List<Future<T>> invokeAll(Function<STUB, Callable<T>> callF) throws InterruptedException {
List<Callable<T>> calls = nodeConfigurationWatcher.getQueryNodes().stream()
.map(id -> callF.apply(indexApi(id)))
.toList();
return virtualExecutorService.invokeAll(calls);
}
/** Invoke a function on each node, returning a stream of results */
public <T> Stream<T> callEachSequential(Function<STUB, T> call) {
return nodeConfigurationWatcher.getQueryNodes().stream()
.map(id -> call.apply(indexApi(id)));
}
/** Create a stub for the given channel, this is an operation
* that needs to be implemented for the particular API this
* pool is intended for
*/
public abstract STUB createStub(ManagedChannel channel);
}

View File

@ -42,6 +42,7 @@ public class QueryService extends Service {
public QueryService(BaseServiceParams params, public QueryService(BaseServiceParams params,
IndexClient indexClient, IndexClient indexClient,
NodeConfigurationWatcher nodeWatcher, NodeConfigurationWatcher nodeWatcher,
QueryGRPCDomainLinksService domainLinksService,
QueryGRPCService queryGRPCService, QueryGRPCService queryGRPCService,
Gson gson, Gson gson,
DomainBlacklist blacklist, DomainBlacklist blacklist,
@ -55,6 +56,7 @@ public class QueryService extends Service {
var grpcServer = ServerBuilder.forPort(params.configuration.port() + 1) var grpcServer = ServerBuilder.forPort(params.configuration.port() + 1)
.addService(queryGRPCService) .addService(queryGRPCService)
.addService(domainLinksService)
.build(); .build();
grpcServer.start(); grpcServer.start();