diff --git a/code/api/index-api/src/main/protobuf/index-api.proto b/code/api/index-api/src/main/protobuf/index-api.proto index a53842d4..e7b5ae69 100644 --- a/code/api/index-api/src/main/protobuf/index-api.proto +++ b/code/api/index-api/src/main/protobuf/index-api.proto @@ -4,6 +4,28 @@ package actorapi; option java_package="nu.marginalia.index.api"; option java_multiple_files=true; +service IndexDomainLinksApi { + rpc getAllLinks(Empty) returns (stream RpcDomainIdPairs) {} + rpc getLinksFromDomain(RpcDomainId) returns (RpcDomainIdList) {} + rpc getLinksToDomain(RpcDomainId) returns (RpcDomainIdList) {} + rpc countLinksFromDomain(RpcDomainId) returns (RpcDomainIdCount) {} + rpc countLinksToDomain(RpcDomainId) returns (RpcDomainIdCount) {} +} + +message RpcDomainId { + int32 domainId = 1; +} +message RpcDomainIdList { + repeated int32 domainId = 1 [packed=true]; +} +message RpcDomainIdCount { + int32 idCount = 1; +} +message RpcDomainIdPairs { + repeated int32 sourceIds = 1 [packed=true]; + repeated int32 destIds = 2 [packed=true]; +} + service QueryApi { rpc query(RpcQsQuery) returns (RpcQsResponse) {} } diff --git a/code/api/query-api/build.gradle b/code/api/query-api/build.gradle index 524d21df..ed893ae1 100644 --- a/code/api/query-api/build.gradle +++ b/code/api/query-api/build.gradle @@ -20,8 +20,10 @@ dependencies { implementation libs.bundles.slf4j + implementation libs.roaringbitmap implementation libs.prometheus implementation libs.notnull + implementation libs.trove implementation libs.guice implementation libs.rxjava implementation libs.gson diff --git a/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java b/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java index 37308576..6c6e63a4 100644 --- a/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java +++ b/code/api/query-api/src/main/java/nu/marginalia/query/client/QueryClient.java @@ -2,24 +2,33 @@ package nu.marginalia.query.client; import com.google.inject.Inject; import com.google.inject.Singleton; +import gnu.trove.list.array.TIntArrayList; import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.prometheus.client.Summary; import nu.marginalia.client.AbstractDynamicClient; import nu.marginalia.client.Context; +import nu.marginalia.index.api.Empty; +import nu.marginalia.index.api.IndexDomainLinksApiGrpc; import nu.marginalia.index.api.QueryApiGrpc; +import nu.marginalia.index.api.RpcDomainId; import nu.marginalia.index.client.model.query.SearchSpecification; import nu.marginalia.index.client.model.results.SearchResultSet; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.query.QueryProtobufCodec; import nu.marginalia.query.model.QueryParams; import nu.marginalia.query.model.QueryResponse; +import nu.marginalia.service.descriptor.ServiceDescriptor; import nu.marginalia.service.descriptor.ServiceDescriptors; import nu.marginalia.service.id.ServiceId; +import org.roaringbitmap.PeekableCharIterator; +import org.roaringbitmap.longlong.PeekableLongIterator; +import org.roaringbitmap.longlong.Roaring64Bitmap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.CheckReturnValue; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -36,13 +45,15 @@ public class QueryClient extends AbstractDynamicClient { .register(); private final Map channels = new ConcurrentHashMap<>(); - private final Map queryApis = new ConcurrentHashMap<>(); + private final Map queryIndexApis = new ConcurrentHashMap<>(); + private final Map domainLinkApis = new ConcurrentHashMap<>(); record ServiceAndNode(String service, int node) { public String getHostName() { return service; } } + private ManagedChannel getChannel(ServiceAndNode serviceAndNode) { return channels.computeIfAbsent(serviceAndNode, san -> ManagedChannelBuilder @@ -52,13 +63,21 @@ public class QueryClient extends AbstractDynamicClient { } public QueryApiGrpc.QueryApiBlockingStub queryApi(int node) { - return queryApis.computeIfAbsent(new ServiceAndNode("query-service", node), n -> + return queryIndexApis.computeIfAbsent(new ServiceAndNode("query-service", node), n -> QueryApiGrpc.newBlockingStub( getChannel(n) ) ); } + public IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub domainApi(int node) { + return domainLinkApis.computeIfAbsent(new ServiceAndNode("query-service", node), n -> + IndexDomainLinksApiGrpc.newBlockingStub( + getChannel(n) + ) + ); + } + private final Logger logger = LoggerFactory.getLogger(getClass()); @Inject @@ -66,6 +85,9 @@ public class QueryClient extends AbstractDynamicClient { super(descriptors.forId(ServiceId.Query), GsonFactory::get); } + public QueryClient() { + super(new ServiceDescriptor(ServiceId.Query, "query-service"), GsonFactory::get); + } /** Delegate an Index API style query directly to the index service */ @CheckReturnValue @@ -82,4 +104,101 @@ public class QueryClient extends AbstractDynamicClient { ); } + public AllLinks getAllDomainLinks() { + AllLinks links = new AllLinks(); + + domainApi(0).getAllLinks(Empty.newBuilder().build()).forEachRemaining(pairs -> { + for (int i = 0; i < pairs.getDestIdsCount(); i++) { + links.add(pairs.getSourceIds(i), pairs.getDestIds(i)); + } + }); + + return links; + } + + public List getLinksToDomain(int domainId) { + try { + return domainApi(0).getLinksToDomain(RpcDomainId + .newBuilder() + .setDomainId(domainId) + .build()) + .getDomainIdList(); + } + catch (Exception e) { + logger.error("API Exception", e); + return List.of(); + } + } + + public List getLinksFromDomain(int domainId) { + try { + return domainApi(0).getLinksFromDomain(RpcDomainId + .newBuilder() + .setDomainId(domainId) + .build()) + .getDomainIdList(); + } + catch (Exception e) { + logger.error("API Exception", e); + return List.of(); + } + } + + public int countLinksToDomain(int domainId) { + try { + return domainApi(0).countLinksToDomain(RpcDomainId + .newBuilder() + .setDomainId(domainId) + .build()) + .getIdCount(); + } + catch (Exception e) { + logger.error("API Exception", e); + return 0; + } + } + + public int countLinksFromDomain(int domainId) { + try { + return domainApi(0).countLinksFromDomain(RpcDomainId + .newBuilder() + .setDomainId(domainId) + .build()) + .getIdCount(); + } + catch (Exception e) { + logger.error("API Exception", e); + return 0; + } + } + public static class AllLinks { + private final Roaring64Bitmap sourceToDest = new Roaring64Bitmap(); + + public void add(int source, int dest) { + sourceToDest.add(Integer.toUnsignedLong(source) << 32 | Integer.toUnsignedLong(dest)); + } + + public Iterator iterator() { + return new Iterator(); + } + + public class Iterator { + private final PeekableLongIterator base = sourceToDest.getLongIterator(); + long val = Long.MIN_VALUE; + + public boolean advance() { + if (base.hasNext()) { + val = base.next(); + return true; + } + return false; + } + public int source() { + return (int) (val >>> 32); + } + public int dest() { + return (int) (val & 0xFFFF_FFFFL); + } + } + } } diff --git a/code/common/linkdb/build.gradle b/code/common/linkdb/build.gradle index 1f223144..08cd4db0 100644 --- a/code/common/linkdb/build.gradle +++ b/code/common/linkdb/build.gradle @@ -16,6 +16,7 @@ configurations { dependencies { implementation project(':code:common:model') + implementation project(':code:common:service') implementation libs.bundles.slf4j @@ -23,6 +24,7 @@ dependencies { implementation libs.bundles.gson implementation libs.notnull + implementation libs.bundles.mariadb implementation libs.sqlite implementation libs.commons.lang3 diff --git a/code/common/linkdb/readme.md b/code/common/linkdb/readme.md index 567ec746..ab86b931 100644 --- a/code/common/linkdb/readme.md +++ b/code/common/linkdb/readme.md @@ -1,11 +1,30 @@ -The link database contains information about links, +## Domain Link Database + +The domain link database contains information about links +between domains. It is a static in-memory database loaded +from a binary file. + +* [DomainLinkDb](src/main/java/nu/marginalia/linkdb/DomainLinkDb.java) +* * [FileDomainLinkDb](src/main/java/nu/marginalia/linkdb/FileDomainLinkDb.java) +* * [SqlDomainLinkDb](src/main/java/nu/marginalia/linkdb/SqlDomainLinkDb.java) +* [DomainLinkDbWriter](src/main/java/nu/marginalia/linkdb/DomainLinkDbWriter.java) +* [DomainLinkDbLoader](src/main/java/nu/marginalia/linkdb/DomainLinkDbLoader.java) + +## Document Database + +The document database contains information about links, such as their ID, their URL, their title, their description, and so forth. -The link database is a sqlite file. The reason this information +The document database is a sqlite file. The reason this information is not in the MariaDB database is that this would make updates to this information take effect in production immediately, even before the information was searchable. -It is constructed by the [loading-process](../../processes/loading-process), and consumed -by the [index-service](../../services-core/index-service). \ No newline at end of file +* [DocumentLinkDbWriter](src/main/java/nu/marginalia/linkdb/DocumentDbWriter.java) +* [DocumentLinkDbLoader](src/main/java/nu/marginalia/linkdb/DocumentDbReader.java) + + +## See Also + +These databases are constructed by the [loading-process](../../processes/loading-process), and consumed by the [index-service](../../services-core/index-service). \ No newline at end of file diff --git a/code/common/linkdb/src/main/java/nu/marginalia/linkdb/LinkdbReader.java b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/DocumentDbReader.java similarity index 91% rename from code/common/linkdb/src/main/java/nu/marginalia/linkdb/LinkdbReader.java rename to code/common/linkdb/src/main/java/nu/marginalia/linkdb/DocumentDbReader.java index 027b2371..6d7aefd6 100644 --- a/code/common/linkdb/src/main/java/nu/marginalia/linkdb/LinkdbReader.java +++ b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/DocumentDbReader.java @@ -4,7 +4,7 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import com.google.inject.name.Named; import gnu.trove.list.TLongList; -import nu.marginalia.linkdb.model.LdbUrlDetail; +import nu.marginalia.linkdb.model.DocdbUrlDetail; import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.id.UrlIdCodec; import org.slf4j.Logger; @@ -23,21 +23,21 @@ import java.util.ArrayList; import java.util.List; @Singleton -public class LinkdbReader { +public class DocumentDbReader { private final Path dbFile; private volatile Connection connection; private final Logger logger = LoggerFactory.getLogger(getClass()); @Inject - public LinkdbReader(@Named("linkdb-file") Path dbFile) throws SQLException { + public DocumentDbReader(@Named("docdb-file") Path dbFile) throws SQLException { this.dbFile = dbFile; if (Files.exists(dbFile)) { connection = createConnection(); } else { - logger.warn("No linkdb file {}", dbFile); + logger.warn("No docdb file {}", dbFile); } } @@ -107,8 +107,8 @@ public class LinkdbReader { return ret; } - public List getUrlDetails(TLongList ids) throws SQLException { - List ret = new ArrayList<>(ids.size()); + public List getUrlDetails(TLongList ids) throws SQLException { + List ret = new ArrayList<>(ids.size()); if (connection == null || connection.isClosed()) @@ -126,7 +126,7 @@ public class LinkdbReader { var rs = stmt.executeQuery(); if (rs.next()) { var url = new EdgeUrl(rs.getString("URL")); - ret.add(new LdbUrlDetail( + ret.add(new DocdbUrlDetail( rs.getLong("ID"), url, rs.getString("TITLE"), diff --git a/code/common/linkdb/src/main/java/nu/marginalia/linkdb/LinkdbWriter.java b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/DocumentDbWriter.java similarity index 83% rename from code/common/linkdb/src/main/java/nu/marginalia/linkdb/LinkdbWriter.java rename to code/common/linkdb/src/main/java/nu/marginalia/linkdb/DocumentDbWriter.java index fa9cad7e..88277e9d 100644 --- a/code/common/linkdb/src/main/java/nu/marginalia/linkdb/LinkdbWriter.java +++ b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/DocumentDbWriter.java @@ -1,24 +1,23 @@ package nu.marginalia.linkdb; -import nu.marginalia.linkdb.model.LdbUrlDetail; +import nu.marginalia.linkdb.model.DocdbUrlDetail; import java.io.IOException; import java.nio.file.Path; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; -import java.sql.Types; import java.util.List; -public class LinkdbWriter { +public class DocumentDbWriter { private final Connection connection; - public LinkdbWriter(Path outputFile) throws SQLException { + public DocumentDbWriter(Path outputFile) throws SQLException { String connStr = "jdbc:sqlite:" + outputFile.toString(); connection = DriverManager.getConnection(connStr); - try (var stream = ClassLoader.getSystemResourceAsStream("db/linkdb-document.sql"); + try (var stream = ClassLoader.getSystemResourceAsStream("db/docdb-document.sql"); var stmt = connection.createStatement() ) { var sql = new String(stream.readAllBytes()); @@ -31,11 +30,11 @@ public class LinkdbWriter { } } - public void add(LdbUrlDetail ldbUrlDetail) throws SQLException { - add(List.of(ldbUrlDetail)); + public void add(DocdbUrlDetail docdbUrlDetail) throws SQLException { + add(List.of(docdbUrlDetail)); } - public void add(List ldbUrlDetail) throws SQLException { + public void add(List docdbUrlDetail) throws SQLException { try (var stmt = connection.prepareStatement(""" INSERT OR IGNORE INTO DOCUMENT(ID, URL, TITLE, DESCRIPTION, WORDS_TOTAL, FORMAT, FEATURES, DATA_HASH, QUALITY, PUB_YEAR) @@ -43,7 +42,7 @@ public class LinkdbWriter { """)) { int i = 0; - for (var document : ldbUrlDetail) { + for (var document : docdbUrlDetail) { var url = document.url(); stmt.setLong(1, document.urlId()); diff --git a/code/common/linkdb/src/main/java/nu/marginalia/linkdb/DomainLinkDb.java b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/DomainLinkDb.java new file mode 100644 index 00000000..b9af1dea --- /dev/null +++ b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/DomainLinkDb.java @@ -0,0 +1,39 @@ +package nu.marginalia.linkdb; + +import gnu.trove.list.array.TIntArrayList; + +import java.io.IOException; +import java.nio.file.Path; +import java.sql.SQLException; +import java.util.Arrays; + +/** A database of source-destination pairs of domain IDs. The database is loaded into memory from + * a source. The database is then kept in memory, reloading it upon switchInput(). + */ +public interface DomainLinkDb { + /** Replace the current db file with the provided file. The provided file will be deleted. + * The in-memory database MAY be updated to reflect the change. + * */ + void switchInput(Path filename) throws Exception; + + /** Find all destinations for the given source. */ + TIntArrayList findDestinations(int source); + + /** Count the number of destinations for the given source. */ + int countDestinations(int source); + + /** Find all sources for the given destination. */ + TIntArrayList findSources(int dest); + + + /** Count the number of sources for the given destination. */ + int countSources(int source); + + /** Iterate over all source-destination pairs. */ + void forEach(SourceDestConsumer consumer); + + + interface SourceDestConsumer { + void accept(int source, int dest); + } +} diff --git a/code/common/linkdb/src/main/java/nu/marginalia/linkdb/DomainLinkDbLoader.java b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/DomainLinkDbLoader.java new file mode 100644 index 00000000..de8c6d96 --- /dev/null +++ b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/DomainLinkDbLoader.java @@ -0,0 +1,45 @@ +package nu.marginalia.linkdb; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class DomainLinkDbLoader implements AutoCloseable { + private final DataInputStream stream; + private final Path filename; + + private long nextVal; + + public DomainLinkDbLoader(Path filename) throws IOException { + this.stream = new DataInputStream(Files.newInputStream(filename)); + this.filename = filename; + } + + public int size() throws IOException { + return (int) (Files.size(filename) / 8); + } + + public boolean next() { + try { + nextVal = stream.readLong(); + return true; + } + catch (IOException ex) { + return false; + } + } + + public int getSource() { + return (int) (nextVal >>> 32); + } + public int getDest() { + return (int) (nextVal & 0xffff_ffffL); + } + + public void close() throws IOException { + stream.close(); + } + + +} diff --git a/code/common/linkdb/src/main/java/nu/marginalia/linkdb/DomainLinkDbWriter.java b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/DomainLinkDbWriter.java new file mode 100644 index 00000000..f275ba01 --- /dev/null +++ b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/DomainLinkDbWriter.java @@ -0,0 +1,29 @@ +package nu.marginalia.linkdb; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; + +public class DomainLinkDbWriter implements AutoCloseable { + private final DataOutputStream stream; + + public DomainLinkDbWriter(Path fileName) throws IOException { + this.stream = new DataOutputStream(Files.newOutputStream(fileName, + StandardOpenOption.CREATE, + StandardOpenOption.WRITE, + StandardOpenOption.TRUNCATE_EXISTING) + ); + } + + public void write(int sourceDomainId, int destDomainId) throws IOException { + stream.writeLong(Integer.toUnsignedLong(sourceDomainId) << 32 + | Integer.toUnsignedLong(destDomainId)); + } + + @Override + public void close() throws IOException { + stream.close(); + } +} diff --git a/code/common/linkdb/src/main/java/nu/marginalia/linkdb/FileDomainLinkDb.java b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/FileDomainLinkDb.java new file mode 100644 index 00000000..53f53417 --- /dev/null +++ b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/FileDomainLinkDb.java @@ -0,0 +1,125 @@ +package nu.marginalia.linkdb; + +import com.google.inject.name.Named; +import gnu.trove.list.array.TIntArrayList; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.Arrays; + +/** Canonical DomainLinkDb implementation. The database is loaded into memory from + * a file. The database is then kept in memory, reloading it upon switchInput(). + */ +public class FileDomainLinkDb implements DomainLinkDb { + private static final Logger logger = LoggerFactory.getLogger(FileDomainLinkDb.class); + private final Path filename; + private volatile long[] sourceToDest = new long[0]; + private volatile long[] destToSource = new long[0]; + + public FileDomainLinkDb(@Named("domain-linkdb-file") Path filename) throws IOException { + this.filename = filename; + if (Files.exists(filename)) { + switchInput(filename); + } + } + + @Override + public void switchInput(Path newFilename) throws IOException { + Files.move(newFilename, filename, StandardCopyOption.REPLACE_EXISTING); + loadInput(filename); + } + + public void loadInput(Path filename) throws IOException { + try (var loader = new DomainLinkDbLoader(filename)) { + int size = loader.size(); + + var newSourceToDest = new long[size]; + var newDestToSource = new long[size]; + + int i = 0; + while (loader.next()) { + long source = loader.getSource(); + long dest = loader.getDest(); + + newSourceToDest[i] = (source << 32) | dest; + newDestToSource[i] = (dest << 32) | source; + + i++; + } + + Arrays.sort(newSourceToDest); + Arrays.sort(newDestToSource); + + sourceToDest = newSourceToDest; + destToSource = newDestToSource; + } + } + + @Override + public TIntArrayList findDestinations(int source) { + return findRelated(sourceToDest, source); + } + + @Override + public TIntArrayList findSources(int dest) { + return findRelated(destToSource, dest); + } + + @Override + public int countDestinations(int source) { + return countRelated(sourceToDest, source); + } + + @Override + public int countSources(int dest) { + return countRelated(destToSource, dest); + } + + @Override + public void forEach(SourceDestConsumer consumer) { + for (long val : sourceToDest) { + consumer.accept((int) (val >>> 32), (int) (val & 0xFFFF_FFFFL)); + } + } + + private TIntArrayList findRelated(long[] range, int key) { + long keyLong = Integer.toUnsignedLong(key) << 32; + long nextKeyLong = Integer.toUnsignedLong(key + 1) << 32; + + int start = Arrays.binarySearch(range, keyLong); + + if (start < 0) { + // Key is not found, get the insertion point + start = -start - 1; + } + + TIntArrayList result = new TIntArrayList(); + + for (int i = start; i < range.length && range[i] < nextKeyLong; i++) { + result.add((int) (range[i] & 0xFFFF_FFFFL)); + } + + return result; + } + + private int countRelated(long[] range, int key) { + long keyLong = Integer.toUnsignedLong(key) << 32; + long nextKeyLong = Integer.toUnsignedLong(key + 1) << 32; + + int start = Arrays.binarySearch(range, keyLong); + + if (start < 0) { + // Key is not found, get the insertion point + start = -start - 1; + } + + int num = 0; + for (int i = start; i < range.length && range[i] < nextKeyLong; i++, num++); + return num; + } + +} diff --git a/code/common/linkdb/src/main/java/nu/marginalia/linkdb/LinkdbFileNames.java b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/LinkdbFileNames.java new file mode 100644 index 00000000..a39769d2 --- /dev/null +++ b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/LinkdbFileNames.java @@ -0,0 +1,7 @@ +package nu.marginalia.linkdb; + +public class LinkdbFileNames { + public static String DEPRECATED_LINKDB_FILE_NAME = "links.db"; + public static String DOCDB_FILE_NAME = "documents.db"; + public static String DOMAIN_LINKS_FILE_NAME = "domain-links.dat"; +} diff --git a/code/common/linkdb/src/main/java/nu/marginalia/linkdb/SqlDomainLinkDb.java b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/SqlDomainLinkDb.java new file mode 100644 index 00000000..4a98eaa9 --- /dev/null +++ b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/SqlDomainLinkDb.java @@ -0,0 +1,158 @@ +package nu.marginalia.linkdb; + +import com.google.inject.name.Named; +import com.zaxxer.hikari.HikariDataSource; +import gnu.trove.list.array.TIntArrayList; +import gnu.trove.list.array.TLongArrayList; +import nu.marginalia.service.module.ServiceConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.Arrays; + +/** DomainLinkDb implementation that goes through the motions of + * being a File-backed DomainLinkDb, but actually uses the legacy SQL database + * for loading the data. + *

+ * This is part of the migration path to using FileDomainLinkDb. + */ +public class SqlDomainLinkDb implements DomainLinkDb { + private volatile long[] sourceToDest = new long[0]; + private volatile long[] destToSource = new long[0]; + private static final Logger logger = LoggerFactory.getLogger(SqlDomainLinkDb.class); + + private final Path filename; + private final HikariDataSource dataSource; + private final int node; + + public SqlDomainLinkDb(@Named("domain-linkdb-file") Path filename, + HikariDataSource dataSource, + ServiceConfiguration configuration) + { + this.filename = filename; + this.dataSource = dataSource; + + node = configuration.node(); + + Thread.ofPlatform().start(() -> { + try { + loadDb(); + } catch (Exception e) { + logger.error("Failed to load linkdb", e); + } + }); + } + + @Override + public void switchInput(Path newFilename) throws IOException { + Files.move(newFilename, filename, StandardCopyOption.REPLACE_EXISTING); + + loadDb(); + } + + public void loadDb() { + try (var conn = dataSource.getConnection(); + var stmt = conn.prepareStatement( + STR.""" + SELECT + SOURCE_DOMAIN_ID, + DEST_DOMAIN_ID + FROM EC_DOMAIN_LINK + INNER JOIN EC_DOMAIN + ON EC_DOMAIN.ID = EC_DOMAIN_LINK.SOURCE_DOMAIN_ID + WHERE NODE_AFFINITY=\{node} + """); + var rs = stmt.executeQuery()) + { + TLongArrayList sourceToDest = new TLongArrayList(10_000_000); + TLongArrayList destToSource = new TLongArrayList(10_000_000); + + while (rs.next()) { + long source = Integer.toUnsignedLong(rs.getInt(1)); + long dest = Integer.toUnsignedLong(rs.getInt(2)); + + sourceToDest.add((source << 32) | dest); + destToSource.add((dest << 32) | source); + } + + sourceToDest.sort(); + destToSource.sort(); + + this.sourceToDest = sourceToDest.toArray(); + this.destToSource = destToSource.toArray(); + } + catch (Exception ex) { + logger.error("Failed to load linkdb", ex); + } + + logger.info("LinkDB loaded, size = {}", sourceToDest.length); + } + + @Override + public TIntArrayList findDestinations(int source) { + return findRelated(sourceToDest, source); + } + + @Override + public TIntArrayList findSources(int dest) { + return findRelated(destToSource, dest); + } + + @Override + public int countDestinations(int source) { + return countRelated(sourceToDest, source); + } + + @Override + public int countSources(int dest) { + return countRelated(destToSource, dest); + } + + @Override + public void forEach(SourceDestConsumer consumer) { + for (long val : sourceToDest) { + consumer.accept((int) (val >>> 32), (int) (val & 0xFFFF_FFFFL)); + } + } + + private TIntArrayList findRelated(long[] range, int key) { + long keyLong = Integer.toUnsignedLong(key) << 32; + long nextKeyLong = Integer.toUnsignedLong(key + 1) << 32; + + int start = Arrays.binarySearch(range, keyLong); + + if (start < 0) { + // Key is not found, get the insertion point + start = -start - 1; + } + + TIntArrayList result = new TIntArrayList(); + + for (int i = start; i < range.length && range[i] < nextKeyLong; i++) { + result.add((int) (range[i] & 0xFFFF_FFFFL)); + } + + return result; + } + + private int countRelated(long[] range, int key) { + long keyLong = Integer.toUnsignedLong(key) << 32; + long nextKeyLong = Integer.toUnsignedLong(key + 1) << 32; + + int start = Arrays.binarySearch(range, keyLong); + + if (start < 0) { + // Key is not found, get the insertion point + start = -start - 1; + } + + int num = 0; + for (int i = start; i < range.length && range[i] < nextKeyLong; i++, num++); + return num; + } + +} diff --git a/code/common/linkdb/src/main/java/nu/marginalia/linkdb/model/DocdbUrlDetail.java b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/model/DocdbUrlDetail.java new file mode 100644 index 00000000..a360571b --- /dev/null +++ b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/model/DocdbUrlDetail.java @@ -0,0 +1,18 @@ +package nu.marginalia.linkdb.model; + +import nu.marginalia.model.EdgeUrl; + +public record DocdbUrlDetail(long urlId, + EdgeUrl url, + String title, + String description, + double urlQuality, + String format, + int features, + Integer pubYear, + long dataHash, + int wordsTotal + ) + +{ +} diff --git a/code/common/linkdb/src/main/java/nu/marginalia/linkdb/model/LdbUrlDetail.java b/code/common/linkdb/src/main/java/nu/marginalia/linkdb/model/LdbUrlDetail.java deleted file mode 100644 index 9b743c9c..00000000 --- a/code/common/linkdb/src/main/java/nu/marginalia/linkdb/model/LdbUrlDetail.java +++ /dev/null @@ -1,18 +0,0 @@ -package nu.marginalia.linkdb.model; - -import nu.marginalia.model.EdgeUrl; - -public record LdbUrlDetail(long urlId, - EdgeUrl url, - String title, - String description, - double urlQuality, - String format, - int features, - Integer pubYear, - long dataHash, - int wordsTotal - ) - -{ -} diff --git a/code/common/linkdb/src/main/resources/db/linkdb-document.sql b/code/common/linkdb/src/main/resources/db/docdb-document.sql similarity index 100% rename from code/common/linkdb/src/main/resources/db/linkdb-document.sql rename to code/common/linkdb/src/main/resources/db/docdb-document.sql diff --git a/code/common/linkdb/src/test/java/nu/marginalia/linkdb/LinkdbWriterTest.java b/code/common/linkdb/src/test/java/nu/marginalia/linkdb/DocumentDbWriterTest.java similarity index 76% rename from code/common/linkdb/src/test/java/nu/marginalia/linkdb/LinkdbWriterTest.java rename to code/common/linkdb/src/test/java/nu/marginalia/linkdb/DocumentDbWriterTest.java index 598e6b67..b28b5ed4 100644 --- a/code/common/linkdb/src/test/java/nu/marginalia/linkdb/LinkdbWriterTest.java +++ b/code/common/linkdb/src/test/java/nu/marginalia/linkdb/DocumentDbWriterTest.java @@ -1,7 +1,7 @@ package nu.marginalia.linkdb; import gnu.trove.list.array.TLongArrayList; -import nu.marginalia.linkdb.model.LdbUrlDetail; +import nu.marginalia.linkdb.model.DocdbUrlDetail; import nu.marginalia.model.EdgeDomain; import org.junit.jupiter.api.Test; @@ -10,13 +10,13 @@ import java.nio.file.Files; import java.nio.file.Path; import java.sql.SQLException; -public class LinkdbWriterTest { +public class DocumentDbWriterTest { @Test public void testCreate() throws IOException { - Path tempPath = Files.createTempFile("linkdb", ".db"); + Path tempPath = Files.createTempFile("docdb", ".db"); try { - var writer = new LinkdbWriter(tempPath); - writer.add(new LdbUrlDetail( + var writer = new DocumentDbWriter(tempPath); + writer.add(new DocdbUrlDetail( 1, new nu.marginalia.model.EdgeUrl("http", new EdgeDomain("example.com"), null, "/", null), "Test", @@ -30,7 +30,7 @@ public class LinkdbWriterTest { )); writer.close(); - var reader = new LinkdbReader(tempPath); + var reader = new DocumentDbReader(tempPath); var deets = reader.getUrlDetails(new TLongArrayList(new long[]{1})); System.out.println(deets); } catch (SQLException e) { diff --git a/code/common/linkdb/src/test/java/nu/marginalia/linkdb/DomainLinkDbTest.java b/code/common/linkdb/src/test/java/nu/marginalia/linkdb/DomainLinkDbTest.java new file mode 100644 index 00000000..1014ba73 --- /dev/null +++ b/code/common/linkdb/src/test/java/nu/marginalia/linkdb/DomainLinkDbTest.java @@ -0,0 +1,50 @@ +package nu.marginalia.linkdb; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +public class DomainLinkDbTest { + Path fileName; + @BeforeEach + public void setUp() throws IOException { + fileName = Files.createTempFile("test", ".db"); + } + @AfterEach + public void tearDown() throws IOException { + Files.deleteIfExists(fileName); + } + + @Test + public void testWriteRead() { + try (var writer = new DomainLinkDbWriter(fileName)) { + writer.write(1, 2); + writer.write(2, 3); + writer.write(3, 4); + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + + try (var reader = new DomainLinkDbLoader(fileName)) { + Assertions.assertTrue(reader.next()); + Assertions.assertEquals(1, reader.getSource()); + Assertions.assertEquals(2, reader.getDest()); + Assertions.assertTrue(reader.next()); + Assertions.assertEquals(2, reader.getSource()); + Assertions.assertEquals(3, reader.getDest()); + Assertions.assertTrue(reader.next()); + Assertions.assertEquals(3, reader.getSource()); + Assertions.assertEquals(4, reader.getDest()); + Assertions.assertFalse(reader.next()); + } + catch (IOException ex) { + throw new RuntimeException(ex); + } + } +} diff --git a/code/features-index/domain-ranking/build.gradle b/code/features-index/domain-ranking/build.gradle index bfd613e9..885787eb 100644 --- a/code/features-index/domain-ranking/build.gradle +++ b/code/features-index/domain-ranking/build.gradle @@ -17,6 +17,8 @@ dependencies { implementation project(':code:common:db') implementation project(':code:common:model') implementation project(':code:common:service') + implementation project(':code:common:service-client') + implementation project(':code:api:query-api') implementation libs.bundles.slf4j implementation libs.bundles.mariadb diff --git a/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/data/RankingDomainFetcher.java b/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/data/RankingDomainFetcher.java index 2499d51f..1be9a6e2 100644 --- a/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/data/RankingDomainFetcher.java +++ b/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/data/RankingDomainFetcher.java @@ -5,6 +5,7 @@ import com.google.inject.Singleton; import com.zaxxer.hikari.HikariDataSource; import nu.marginalia.db.DomainBlacklistImpl; import nu.marginalia.model.crawl.DomainIndexingState; +import nu.marginalia.query.client.QueryClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,14 +16,18 @@ import java.util.function.IntConsumer; @Singleton public class RankingDomainFetcher { protected final HikariDataSource dataSource; + private final QueryClient queryClient; protected final DomainBlacklistImpl blacklist; protected final Logger logger = LoggerFactory.getLogger(getClass()); protected boolean getNames = false; @Inject - public RankingDomainFetcher(HikariDataSource dataSource, DomainBlacklistImpl blacklist) { + public RankingDomainFetcher(HikariDataSource dataSource, + QueryClient queryClient, + DomainBlacklistImpl blacklist) { this.dataSource = dataSource; + this.queryClient = queryClient; this.blacklist = blacklist; } @@ -33,10 +38,10 @@ public class RankingDomainFetcher { public void getDomains(Consumer consumer) { String query; if (getNames) { - query = "SELECT EC_DOMAIN.ID,DOMAIN_NAME,DOMAIN_ALIAS,STATE,KNOWN_URLS FROM EC_DOMAIN INNER JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID INNER JOIN EC_DOMAIN_LINK ON SOURCE_DOMAIN_ID=EC_DOMAIN.ID WHERE ((INDEXED>1 AND IS_ALIVE) OR (INDEXED=1 AND VISITED_URLS=KNOWN_URLS AND GOOD_URLS>0)) AND SOURCE_DOMAIN_ID!=DEST_DOMAIN_ID GROUP BY EC_DOMAIN.ID"; + query = "SELECT EC_DOMAIN.ID,DOMAIN_NAME,DOMAIN_ALIAS,STATE,KNOWN_URLS FROM EC_DOMAIN INNER JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID WHERE NODE_AFFINITY>0 GROUP BY EC_DOMAIN.ID"; } else { - query = "SELECT EC_DOMAIN.ID,\"\",DOMAIN_ALIAS,STATE,KNOWN_URLS FROM EC_DOMAIN INNER JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID INNER JOIN EC_DOMAIN_LINK ON SOURCE_DOMAIN_ID=EC_DOMAIN.ID WHERE ((INDEXED>1 AND IS_ALIVE) OR (INDEXED=1 AND VISITED_URLS=KNOWN_URLS AND GOOD_URLS>0)) AND SOURCE_DOMAIN_ID!=DEST_DOMAIN_ID GROUP BY EC_DOMAIN.ID"; + query = "SELECT EC_DOMAIN.ID,\"\",DOMAIN_ALIAS,STATE,KNOWN_URLS FROM EC_DOMAIN INNER JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID WHERE NODE_AFFINITY>0 GROUP BY EC_DOMAIN.ID"; } getDomains(query, consumer); @@ -77,23 +82,14 @@ public class RankingDomainFetcher { } public void eachDomainLink(DomainLinkConsumer consumer) { - try (var conn = dataSource.getConnection(); - var stmt = conn.prepareStatement("SELECT SOURCE_DOMAIN_ID, DEST_DOMAIN_ID FROM EC_DOMAIN_LINK")) - { - stmt.setFetchSize(10000); - var rsp = stmt.executeQuery(); + var allLinks = queryClient.getAllDomainLinks(); + var iter = allLinks.iterator(); - while (rsp.next()) { - int src = rsp.getInt(1); - int dst = rsp.getInt(2); - - consumer.accept(src, dst); - } - } - catch (SQLException ex) { - logger.error("Failed to fetch domain links", ex); + while (iter.advance()) { + consumer.accept(iter.source(), iter.dest()); } + } public void domainsByPattern(String pattern, IntConsumer idConsumer) { diff --git a/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/data/RankingDomainFetcherForSimilarityData.java b/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/data/RankingDomainFetcherForSimilarityData.java index eccb87ad..ae801166 100644 --- a/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/data/RankingDomainFetcherForSimilarityData.java +++ b/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/data/RankingDomainFetcherForSimilarityData.java @@ -4,6 +4,7 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import com.zaxxer.hikari.HikariDataSource; import nu.marginalia.db.DomainBlacklistImpl; +import nu.marginalia.query.client.QueryClient; import org.slf4j.LoggerFactory; import java.sql.SQLException; @@ -14,8 +15,8 @@ public class RankingDomainFetcherForSimilarityData extends RankingDomainFetcher final boolean hasData; @Inject - public RankingDomainFetcherForSimilarityData(HikariDataSource dataSource, DomainBlacklistImpl blacklist) { - super(dataSource, blacklist); + public RankingDomainFetcherForSimilarityData(HikariDataSource dataSource, QueryClient queryClient, DomainBlacklistImpl blacklist) { + super(dataSource, queryClient, blacklist); hasData = isDomainNeighborTablePopulated(dataSource); } @@ -61,17 +62,6 @@ public class RankingDomainFetcherForSimilarityData extends RankingDomainFetcher } public void getDomains(Consumer consumer) { -// String query = -// """ -// SELECT EC_DOMAIN.ID,DOMAIN_NAME,DOMAIN_ALIAS,STATE,COALESCE(KNOWN_URLS, 0) -// FROM EC_DOMAIN -// LEFT JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID -// INNER JOIN EC_DOMAIN_LINK ON DEST_DOMAIN_ID=EC_DOMAIN.ID -// WHERE SOURCE_DOMAIN_ID!=DEST_DOMAIN_ID -// GROUP BY EC_DOMAIN.ID -// HAVING COUNT(SOURCE_DOMAIN_ID)>5 -// """; - String query; if (getNames) { query = diff --git a/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/tool/CreateBrowseDomainRanksTool.java b/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/tool/CreateBrowseDomainRanksTool.java deleted file mode 100644 index 17b2e195..00000000 --- a/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/tool/CreateBrowseDomainRanksTool.java +++ /dev/null @@ -1,71 +0,0 @@ -package nu.marginalia.ranking.tool; - -import com.zaxxer.hikari.HikariDataSource; -import lombok.SneakyThrows; -import nu.marginalia.db.DomainBlacklistImpl; -import nu.marginalia.ranking.StandardPageRank; -import nu.marginalia.ranking.accumulator.RankingResultListAccumulator; -import nu.marginalia.ranking.data.RankingDomainFetcherForSimilarityData; -import nu.marginalia.service.module.DatabaseModule; -import org.mariadb.jdbc.Driver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.SQLException; -import java.util.concurrent.LinkedBlockingQueue; - -public class CreateBrowseDomainRanksTool { - - private static final Logger logger = LoggerFactory.getLogger(CreateBrowseDomainRanksTool.class); - - - static final LinkedBlockingQueue uploadQueue = new LinkedBlockingQueue<>(10); - volatile static boolean running = true; - - @SneakyThrows - public static void main(String... args) { - Driver driver = new Driver(); - var conn = new DatabaseModule().provideConnection(); - - long start = System.currentTimeMillis(); - var uploader = new Thread(() -> uploadThread(conn), "Uploader"); - - logger.info("Ranking"); - var ds = new DatabaseModule().provideConnection(); - var domains = new RankingDomainFetcherForSimilarityData(ds, new DomainBlacklistImpl(ds)); - var rpr = new StandardPageRank(domains, args); - - uploader.start(); - - var rankData = rpr.pageRankWithPeripheralNodes(1000, RankingResultListAccumulator::new); - - rankData.forEach(i -> { - try { - uploadQueue.put(i); - } catch (InterruptedException e) { - e.printStackTrace(); - } - return true; - }); - - long end = System.currentTimeMillis(); - running = false; - uploader.join(); - - logger.info("Done in {}", (end - start)/1000.0); - } - - public static void uploadThread(HikariDataSource dataSource) { - try (var conn = dataSource.getConnection()) { - try (var stmt = conn.prepareStatement("INSERT IGNORE INTO EC_RANDOM_DOMAINS(DOMAIN_SET, DOMAIN_ID) VALUES (3, ?)")) { - while (running || (!running && !uploadQueue.isEmpty())) { - var job = uploadQueue.take(); - stmt.setInt(1, job); - stmt.executeUpdate(); - } - } - } catch (SQLException | InterruptedException throwables) { - throwables.printStackTrace(); - } - } -} diff --git a/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/tool/PerusePageRankV2.java b/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/tool/PerusePageRankV2.java deleted file mode 100644 index be64a4e2..00000000 --- a/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/tool/PerusePageRankV2.java +++ /dev/null @@ -1,264 +0,0 @@ -package nu.marginalia.ranking.tool; - - -import com.zaxxer.hikari.HikariDataSource; -import gnu.trove.list.TIntList; -import gnu.trove.list.array.TIntArrayList; -import gnu.trove.map.hash.TIntIntHashMap; -import gnu.trove.map.hash.TIntObjectHashMap; -import it.unimi.dsi.fastutil.ints.IntArrays; -import it.unimi.dsi.fastutil.ints.IntComparator; -import lombok.AllArgsConstructor; -import lombok.SneakyThrows; -import nu.marginalia.ranking.RankingAlgorithm; -import nu.marginalia.ranking.data.RankingDomainData; -import nu.marginalia.ranking.data.RankingDomainFetcher; -import nu.marginalia.db.DomainBlacklistImpl; -import nu.marginalia.service.module.DatabaseModule; -import org.jetbrains.annotations.NotNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.SQLException; -import java.util.Arrays; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.stream.IntStream; - -public class PerusePageRankV2 { - - final TIntObjectHashMap domainsById = new TIntObjectHashMap<>(); - final TIntIntHashMap domainIndexToId = new TIntIntHashMap(); - final TIntIntHashMap domainIdToIndex = new TIntIntHashMap(); - - TIntArrayList[] linkDataSrc2Dest; - TIntArrayList[] linkDataDest2Src; - - private final Logger logger = LoggerFactory.getLogger(getClass()); - - static final LinkedBlockingQueue uploadQueue = new LinkedBlockingQueue<>(10); - volatile static boolean running = true; - - public int indexMax() { - return domainIndexToId.size(); - } - - public int getDomainId(int idx) { - return domainIndexToId.get(idx); - } - - @SneakyThrows - public static void main(String... args) { - var ds = new DatabaseModule().provideConnection(); - var blacklist = new DomainBlacklistImpl(ds); - var rank = new PerusePageRankV2(new RankingDomainFetcher(ds, blacklist)); - - long start = System.currentTimeMillis(); - var uploader = new Thread(() -> uploadThread(ds)); - uploader.start(); - - IntStream.range(0, rank.indexMax()).parallel().forEach(i -> { - int[] ids = rank.pageRank(i, 25).toArray(); - try { - uploadQueue.put(new LinkAdjacencies(rank.getDomainId(i), ids)); - } catch (InterruptedException e) { - e.printStackTrace(); - } - }); - - long end = System.currentTimeMillis(); - running = false; - uploader.join(); - System.out.printf("%2.2f", (end - start)/1000.0); - } - - @AllArgsConstructor - static class LinkAdjacencies { - public final int id; - public final int[] neighbors; - } - - public static void uploadThread(HikariDataSource dataSource) { - try (var conn = dataSource.getConnection()) { - try (var stmt = conn.prepareStatement("INSERT INTO EC_DOMAIN_NEIGHBORS(DOMAIN_ID, NEIGHBOR_ID, ADJ_IDX) VALUES (?,?,?) ON DUPLICATE KEY UPDATE NEIGHBOR_ID=VALUES(NEIGHBOR_ID)")) { - while (running || (!running && !uploadQueue.isEmpty())) { - var job = uploadQueue.take(); - for (int i = 0; i < job.neighbors.length; i++) { - stmt.setInt(1, job.id); - stmt.setInt(2, job.neighbors[i]); - stmt.setInt(3, i); - stmt.addBatch(); - } - stmt.executeBatch(); - } - } - } catch (SQLException | InterruptedException throwables) { - throwables.printStackTrace(); - } - } - - public PerusePageRankV2(RankingDomainFetcher domainFetcher) { - - domainFetcher.getDomains(domainData -> { - int id = domainData.id; - - domainsById.put(id, domainData); - - domainIndexToId.put(domainIndexToId.size(), id); - domainIdToIndex.put(id, domainIdToIndex.size()); - }); - domainFetcher.getPeripheralDomains(domainData -> { - int id = domainData.id; - - domainsById.put(id, domainData); - - domainIndexToId.put(domainIndexToId.size(), id); - domainIdToIndex.put(id, domainIdToIndex.size()); - }); - - linkDataSrc2Dest = new TIntArrayList[domainIndexToId.size()]; - linkDataDest2Src = new TIntArrayList[domainIndexToId.size()]; - - domainFetcher.eachDomainLink((src, dst) -> { - if (src == dst) return; - - if (domainsById.contains(src) && domainsById.contains(dst)) { - - int srcIdx = domainIdToIndex.get(src); - int dstIdx = domainIdToIndex.get(domainsById.get(dst).resolveAlias()); - - if (linkDataSrc2Dest[srcIdx] == null) { - linkDataSrc2Dest[srcIdx] = new TIntArrayList(); - } - linkDataSrc2Dest[srcIdx].add(dstIdx); - - if (linkDataDest2Src[dstIdx] == null) { - linkDataDest2Src[dstIdx] = new TIntArrayList(); - } - linkDataDest2Src[dstIdx].add(srcIdx); - } - }); - } - - public TIntList pageRank(int origin, int resultCount) { - RankVector rank = new RankVector(1.d / domainsById.size()); - - int iter_max = 10; - for (int i = 0; i < iter_max; i++) { - RankVector newRank = createNewRankVector(rank); - - double oldNorm = rank.norm(); - double newNorm = newRank.norm(); - double dNorm = oldNorm - newNorm ; - - newRank.increment(origin, dNorm/oldNorm); - - rank = newRank; - } - - rank.increment(origin, -1); - - return rank.getRanking(resultCount); - } - - @NotNull - private RankVector createNewRankVector(RankVector rank) { - - double rankNorm = rank.norm(); - RankVector newRank = new RankVector(0); - - for (int domainId = 0; domainId < domainIndexToId.size(); domainId++) { - - var links = linkDataSrc2Dest[domainId]; - double newRankValue = 0; - - if (links != null && links.size() > 0) { - - - for (int j = 0; j < links.size(); j++) { - var revLinks = linkDataDest2Src[links.getQuick(j)]; - newRankValue += rank.get(links.getQuick(j)) / revLinks.size(); - } - } - - newRank.set(domainId, 0.85*newRankValue/rankNorm); - } - - return newRank; - } - - public class RankVector { - private final double[] rank; - public RankVector(double defaultValue) { - rank = new double[domainIndexToId.size()]; - if (defaultValue != 0.) { - Arrays.fill(rank, defaultValue); - } - } - - public void set(int id, double value) { - rank[id] = value; - } - - public void increment(int id, double value) { - rank[id] += value; - } - - public double get(int id) { - if (id >= rank.length) return 0.; - - return rank[id]; - } - - public double norm() { - double v = 0.; - for (int i = 0; i < rank.length; i++) { - if (rank[i] > 0) { v+=rank[i]; } - else { v -= rank[i]; } - } - return v; - } - - public double norm(RankingAlgorithm.RankVector other) { - double v = 0.; - for (int i = 0; i < rank.length; i++) { - double dv = rank[i] - other.get(i); - - if (dv > 0) { v+=dv; } - else { v -= dv; } - } - return v; - } - - public TIntList getRanking(int numResults) { - if (numResults < 0) { - numResults = domainIdToIndex.size(); - } - TIntArrayList list = new TIntArrayList(numResults); - - int[] nodes = new int[rank.length]; - Arrays.setAll(nodes, i->i); - IntComparator comp = (i,j) -> (int) Math.signum(rank[j] - rank[i]); - IntArrays.quickSort(nodes, comp); - - int i; - - for (i = 0; i < numResults; i++) { - int id = domainIndexToId.get(nodes[i]); - - if (!domainsById.get(id).isAlias()) - list.add(id); - } - - for (; i < nodes.length && domainsById.size() < numResults; i++) { - int id = domainIndexToId.get(nodes[i]); - - if (!domainsById.get(id).isAlias()) - list.add(id); - } - - - return list; - } - } - -} diff --git a/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/tool/PrintDomainRanksTool.java b/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/tool/PrintDomainRanksTool.java deleted file mode 100644 index 9877f393..00000000 --- a/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/tool/PrintDomainRanksTool.java +++ /dev/null @@ -1,67 +0,0 @@ -package nu.marginalia.ranking.tool; - -import lombok.SneakyThrows; -import nu.marginalia.ranking.accumulator.RankingResultListAccumulator; -import nu.marginalia.ranking.data.RankingDomainFetcher; -import nu.marginalia.db.DomainBlacklistImpl; -import nu.marginalia.ranking.StandardPageRank; -import nu.marginalia.ranking.data.RankingDomainFetcherForSimilarityData; -import nu.marginalia.service.module.DatabaseModule; -import org.mariadb.jdbc.Driver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicInteger; - -public class PrintDomainRanksTool { - - private static final Logger logger = LoggerFactory.getLogger(PrintDomainRanksTool.class); - - private volatile static int rankMax; - - static final LinkedBlockingQueue uploadQueue = new LinkedBlockingQueue<>(10); - volatile static boolean running = true; - - @SneakyThrows - public static void main(String... args) { - Driver driver = new Driver(); - var conn = new DatabaseModule().provideConnection(); - - long start = System.currentTimeMillis(); - - logger.info("Ranking"); - var ds = new DatabaseModule().provideConnection(); - - RankingDomainFetcher domains; - if (Boolean.getBoolean("use-link-data")) { - domains = new RankingDomainFetcher(ds, new DomainBlacklistImpl(ds)); - domains.retainNames(); - } - else { - domains = new RankingDomainFetcherForSimilarityData(ds, new DomainBlacklistImpl(ds)); - domains.retainNames(); - } - - var rpr = new StandardPageRank(domains, args); - - rankMax = rpr.size(); - - var rankData = rpr.pageRankWithPeripheralNodes(rankMax, RankingResultListAccumulator::new); - - AtomicInteger cnt = new AtomicInteger(); - rankData.forEach(i -> { - - var data = rpr.getDomainData(i); - - System.out.printf("%d %s %s\n", cnt.getAndIncrement(), data.name, data.state); - return true; - }); - - long end = System.currentTimeMillis(); - running = false; - - logger.info("Done in {}", (end - start)/1000.0); - } - -} diff --git a/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/tool/UpdateDomainRanksTool.java b/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/tool/UpdateDomainRanksTool.java deleted file mode 100644 index 7e57bc8a..00000000 --- a/code/features-index/domain-ranking/src/main/java/nu/marginalia/ranking/tool/UpdateDomainRanksTool.java +++ /dev/null @@ -1,85 +0,0 @@ -package nu.marginalia.ranking.tool; - -import com.zaxxer.hikari.HikariDataSource; -import lombok.SneakyThrows; -import nu.marginalia.ranking.StandardPageRank; -import nu.marginalia.ranking.accumulator.RankingResultListAccumulator; -import nu.marginalia.ranking.data.RankingDomainFetcherForSimilarityData; - -import nu.marginalia.db.DomainBlacklistImpl; -import nu.marginalia.service.module.DatabaseModule; -import org.mariadb.jdbc.Driver; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.SQLException; -import java.util.concurrent.LinkedBlockingQueue; - -public class UpdateDomainRanksTool { - - private static final Logger logger = LoggerFactory.getLogger(UpdateDomainRanksTool.class); - - private volatile static int rankMax; - - static final LinkedBlockingQueue uploadQueue = new LinkedBlockingQueue<>(10); - volatile static boolean running = true; - - @SneakyThrows - public static void main(String... args) { - Driver driver = new Driver(); - var conn = new DatabaseModule().provideConnection(); - - long start = System.currentTimeMillis(); - var uploader = new Thread(() -> uploadThread(conn), "Uploader"); - - logger.info("Ranking"); - var domains = new RankingDomainFetcherForSimilarityData(conn, new DomainBlacklistImpl(conn)); - var rpr = new StandardPageRank(domains, "memex.marginalia.nu", "bikobatanari.art", "sadgrl.online", "wiki.xxiivv.com"); - - rankMax = rpr.size(); - uploader.start(); - - var rankData = rpr.pageRankWithPeripheralNodes(rankMax, RankingResultListAccumulator::new); - - rankData.forEach(i -> { - try { - uploadQueue.put(i); - } catch (InterruptedException e) { - e.printStackTrace(); - } - return true; - }); - - long end = System.currentTimeMillis(); - running = false; - uploader.join(); - - logger.info("Done in {}", (end - start)/1000.0); - } - - public static void uploadThread(HikariDataSource dataSource) { - int i = 0; - - try (var conn = dataSource.getConnection()) { - logger.info("Resetting rank"); - try (var stmt = conn.prepareStatement("UPDATE EC_DOMAIN SET RANK=1")) { - stmt.executeUpdate(); - } - - logger.info("Updating ranks"); - try (var stmt = conn.prepareStatement("UPDATE EC_DOMAIN SET RANK=? WHERE ID=?")) { - while (running || (!running && !uploadQueue.isEmpty())) { - var job = uploadQueue.take(); - stmt.setDouble(1, i++ / (double) rankMax); - stmt.setInt(2, job); - stmt.executeUpdate(); - } - } - - logger.info("Recalculating quality"); - - } catch (SQLException | InterruptedException throwables) { - throwables.printStackTrace(); - } - } -} diff --git a/code/features-search/random-websites/src/main/java/nu/marginalia/browse/DbBrowseDomainsSimilarCosine.java b/code/features-search/random-websites/src/main/java/nu/marginalia/browse/DbBrowseDomainsSimilarCosine.java index 63a276a2..f75a87de 100644 --- a/code/features-search/random-websites/src/main/java/nu/marginalia/browse/DbBrowseDomainsSimilarCosine.java +++ b/code/features-search/random-websites/src/main/java/nu/marginalia/browse/DbBrowseDomainsSimilarCosine.java @@ -3,7 +3,6 @@ package nu.marginalia.browse; import com.google.inject.Inject; import com.google.inject.Singleton; import com.zaxxer.hikari.HikariDataSource; -import gnu.trove.set.hash.TIntHashSet; import nu.marginalia.browse.model.BrowseResult; import nu.marginalia.model.EdgeDomain; import nu.marginalia.db.DomainBlacklist; diff --git a/code/features-search/random-websites/src/main/java/nu/marginalia/browse/DbBrowseDomainsSimilarOldAlgo.java b/code/features-search/random-websites/src/main/java/nu/marginalia/browse/DbBrowseDomainsSimilarOldAlgo.java deleted file mode 100644 index bf155040..00000000 --- a/code/features-search/random-websites/src/main/java/nu/marginalia/browse/DbBrowseDomainsSimilarOldAlgo.java +++ /dev/null @@ -1,132 +0,0 @@ -package nu.marginalia.browse; - -import com.google.inject.Inject; -import com.google.inject.Singleton; -import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.browse.model.BrowseResult; -import nu.marginalia.model.EdgeDomain; -import nu.marginalia.db.DomainBlacklist; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.SQLException; -import java.util.*; - -@Singleton -public class DbBrowseDomainsSimilarOldAlgo { - - private final Logger logger = LoggerFactory.getLogger(getClass()); - private final HikariDataSource dataSource; - - @Inject - public DbBrowseDomainsSimilarOldAlgo(HikariDataSource dataSource) { - this.dataSource = dataSource; - } - - public List getDomainNeighborsAdjacent(int domainId, DomainBlacklist blacklist, int count) { - final Set domains = new HashSet<>(count*3); - - final String q = """ - SELECT EC_DOMAIN.ID AS NEIGHBOR_ID, DOMAIN_NAME, COUNT(*) AS CNT, INDEXED - FROM EC_DOMAIN_NEIGHBORS - INNER JOIN EC_DOMAIN ON NEIGHBOR_ID=EC_DOMAIN.ID - INNER JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID - INNER JOIN EC_DOMAIN_LINK ON DEST_DOMAIN_ID=EC_DOMAIN.ID - WHERE - STATE<2 - AND KNOWN_URLS<1000 - AND DOMAIN_ALIAS IS NULL - AND EC_DOMAIN_NEIGHBORS.DOMAIN_ID = ? - GROUP BY EC_DOMAIN.ID - HAVING CNT < 100 - ORDER BY ADJ_IDX - LIMIT ? - """; - - try (var connection = dataSource.getConnection()) { - try (var stmt = connection.prepareStatement(q)) { - stmt.setFetchSize(count); - stmt.setInt(1, domainId); - stmt.setInt(2, count); - var rsp = stmt.executeQuery(); - while (rsp.next()) { - int id = rsp.getInt(1); - String domain = rsp.getString(2); - - if (!blacklist.isBlacklisted(id)) { - domains.add(new BrowseResult(new EdgeDomain(domain).toRootUrl(), id, 0, rsp.getBoolean("INDEXED"))); - } - } - } - - if (domains.size() < count/2) { - final String q2 = """ - SELECT EC_DOMAIN.ID, DOMAIN_NAME, INDEXED - FROM EC_DOMAIN - INNER JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID - INNER JOIN EC_DOMAIN_LINK B ON DEST_DOMAIN_ID=EC_DOMAIN.ID - INNER JOIN EC_DOMAIN_LINK O ON O.DEST_DOMAIN_ID=EC_DOMAIN.ID - WHERE B.SOURCE_DOMAIN_ID=? - AND STATE<2 - AND KNOWN_URLS<1000 - AND DOMAIN_ALIAS IS NULL - GROUP BY EC_DOMAIN.ID - HAVING COUNT(*) < 100 ORDER BY RANK ASC LIMIT ?"""; - try (var stmt = connection.prepareStatement(q2)) { - - stmt.setFetchSize(count/2); - stmt.setInt(1, domainId); - stmt.setInt(2, count/2 - domains.size()); - var rsp = stmt.executeQuery(); - while (rsp.next() && domains.size() < count/2) { - int id = rsp.getInt(1); - String domain = rsp.getString(2); - - if (!blacklist.isBlacklisted(id)) { - domains.add(new BrowseResult(new EdgeDomain(domain).toRootUrl(), id, 0, rsp.getBoolean("INDEXED"))); - } - } - } - } - - if (domains.size() < count/2) { - final String q3 = """ - SELECT EC_DOMAIN.ID, DOMAIN_NAME, INDEXED - FROM EC_DOMAIN - INNER JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID - INNER JOIN EC_DOMAIN_LINK B ON B.SOURCE_DOMAIN_ID=EC_DOMAIN.ID - INNER JOIN EC_DOMAIN_LINK O ON O.DEST_DOMAIN_ID=EC_DOMAIN.ID - WHERE B.DEST_DOMAIN_ID=? - AND STATE<2 - AND KNOWN_URLS<1000 - AND DOMAIN_ALIAS IS NULL - GROUP BY EC_DOMAIN.ID - HAVING COUNT(*) < 100 - ORDER BY RANK ASC - LIMIT ?"""; - try (var stmt = connection.prepareStatement(q3)) { - stmt.setFetchSize(count/2); - stmt.setInt(1, domainId); - stmt.setInt(2, count/2 - domains.size()); - - var rsp = stmt.executeQuery(); - while (rsp.next() && domains.size() < count/2) { - int id = rsp.getInt(1); - String domain = rsp.getString(2); - - if (!blacklist.isBlacklisted(id)) { - domains.add(new BrowseResult(new EdgeDomain(domain).toRootUrl(), id, 0, rsp.getBoolean("INDEXED"))); - } - } - } - } - } catch (SQLException throwables) { - throwables.printStackTrace(); - } - - - return new ArrayList<>(domains); - } - - -} diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java index 98e66c5f..a91678d8 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderMain.java @@ -9,7 +9,7 @@ import lombok.SneakyThrows; import nu.marginalia.ProcessConfiguration; import nu.marginalia.ProcessConfigurationModule; import nu.marginalia.storage.FileStorageService; -import nu.marginalia.linkdb.LinkdbWriter; +import nu.marginalia.linkdb.DocumentDbWriter; import nu.marginalia.loading.documents.DocumentLoaderService; import nu.marginalia.loading.documents.KeywordLoaderService; import nu.marginalia.loading.domains.DomainIdRegistry; @@ -43,7 +43,7 @@ public class LoaderMain { private final ProcessHeartbeatImpl heartbeat; private final MessageQueueFactory messageQueueFactory; private final FileStorageService fileStorageService; - private final LinkdbWriter linkdbWriter; + private final DocumentDbWriter documentDbWriter; private final LoaderIndexJournalWriter journalWriter; private final DomainLoaderService domainService; private final DomainLinksLoaderService linksService; @@ -77,7 +77,7 @@ public class LoaderMain { public LoaderMain(ProcessHeartbeatImpl heartbeat, MessageQueueFactory messageQueueFactory, FileStorageService fileStorageService, - LinkdbWriter linkdbWriter, + DocumentDbWriter documentDbWriter, LoaderIndexJournalWriter journalWriter, DomainLoaderService domainService, DomainLinksLoaderService linksService, @@ -90,7 +90,7 @@ public class LoaderMain { this.heartbeat = heartbeat; this.messageQueueFactory = messageQueueFactory; this.fileStorageService = fileStorageService; - this.linkdbWriter = linkdbWriter; + this.documentDbWriter = documentDbWriter; this.journalWriter = journalWriter; this.domainService = domainService; this.linksService = linksService; @@ -132,7 +132,7 @@ public class LoaderMain { } finally { journalWriter.close(); - linkdbWriter.close(); + documentDbWriter.close(); heartbeat.shutDown(); } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderModule.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderModule.java index abd1d08a..1ba5d9ca 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderModule.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/LoaderModule.java @@ -9,8 +9,9 @@ import com.google.inject.name.Names; import nu.marginalia.LanguageModels; import nu.marginalia.WmsaHome; import nu.marginalia.IndexLocations; +import nu.marginalia.linkdb.DomainLinkDbWriter; import nu.marginalia.storage.FileStorageService; -import nu.marginalia.linkdb.LinkdbWriter; +import nu.marginalia.linkdb.DocumentDbWriter; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.service.SearchServiceDescriptors; import nu.marginalia.service.descriptor.ServiceDescriptors; @@ -20,6 +21,9 @@ import java.nio.file.Files; import java.nio.file.Path; import java.sql.SQLException; +import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME; +import static nu.marginalia.linkdb.LinkdbFileNames.DOMAIN_LINKS_FILE_NAME; + public class LoaderModule extends AbstractModule { public LoaderModule() { @@ -34,14 +38,26 @@ public class LoaderModule extends AbstractModule { } @Inject @Provides @Singleton - private LinkdbWriter createLinkdbWriter(FileStorageService service) throws SQLException, IOException { - - Path dbPath = IndexLocations.getLinkdbWritePath(service).resolve("links.db"); + private DocumentDbWriter createLinkdbWriter(FileStorageService service) throws SQLException, IOException { + // Migrate + Path dbPath = IndexLocations.getLinkdbWritePath(service).resolve(DOCDB_FILE_NAME); if (Files.exists(dbPath)) { Files.delete(dbPath); } - return new LinkdbWriter(dbPath); + return new DocumentDbWriter(dbPath); + } + + @Inject @Provides @Singleton + private DomainLinkDbWriter createDomainLinkdbWriter(FileStorageService service) throws SQLException, IOException { + + Path dbPath = IndexLocations.getLinkdbWritePath(service).resolve(DOMAIN_LINKS_FILE_NAME); + + if (Files.exists(dbPath)) { + Files.delete(dbPath); + } + + return new DomainLinkDbWriter(dbPath); } private Gson createGson() { diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/DocumentLoaderService.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/DocumentLoaderService.java index b0c86dcc..bed93d7e 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/DocumentLoaderService.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/documents/DocumentLoaderService.java @@ -4,9 +4,8 @@ import com.google.inject.Inject; import com.google.inject.Singleton; import lombok.SneakyThrows; import nu.marginalia.io.processed.DocumentRecordParquetFileReader; -import nu.marginalia.io.processed.ProcessedDataFileNames; -import nu.marginalia.linkdb.LinkdbWriter; -import nu.marginalia.linkdb.model.LdbUrlDetail; +import nu.marginalia.linkdb.DocumentDbWriter; +import nu.marginalia.linkdb.model.DocdbUrlDetail; import nu.marginalia.loading.LoaderInputData; import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.model.EdgeUrl; @@ -26,11 +25,11 @@ import java.util.List; public class DocumentLoaderService { private static final Logger logger = LoggerFactory.getLogger(DocumentLoaderService.class); - private final LinkdbWriter linkdbWriter; + private final DocumentDbWriter documentDbWriter; @Inject - public DocumentLoaderService(LinkdbWriter linkdbWriter) { - this.linkdbWriter = linkdbWriter; + public DocumentLoaderService(DocumentDbWriter documentDbWriter) { + this.documentDbWriter = documentDbWriter; } public boolean loadDocuments( @@ -73,7 +72,7 @@ public class DocumentLoaderService { class LinkdbLoader implements AutoCloseable { private final DomainIdRegistry domainIdRegistry; - private final List details = new ArrayList<>(1000); + private final List details = new ArrayList<>(1000); LinkdbLoader(DomainIdRegistry domainIdRegistry) { this.domainIdRegistry = domainIdRegistry; @@ -88,7 +87,7 @@ public class DocumentLoaderService { projection.ordinal ); - details.add(new LdbUrlDetail( + details.add(new DocdbUrlDetail( urlId, new EdgeUrl(projection.url), projection.title, @@ -102,7 +101,7 @@ public class DocumentLoaderService { )); if (details.size() > 100) { - linkdbWriter.add(details); + documentDbWriter.add(details); details.clear(); } @@ -111,7 +110,7 @@ public class DocumentLoaderService { @Override public void close() throws SQLException { if (!details.isEmpty()) { - linkdbWriter.add(details); + documentDbWriter.add(details); } } } diff --git a/code/processes/loading-process/src/main/java/nu/marginalia/loading/links/DomainLinksLoaderService.java b/code/processes/loading-process/src/main/java/nu/marginalia/loading/links/DomainLinksLoaderService.java index 6f3a6d8f..272b3936 100644 --- a/code/processes/loading-process/src/main/java/nu/marginalia/loading/links/DomainLinksLoaderService.java +++ b/code/processes/loading-process/src/main/java/nu/marginalia/loading/links/DomainLinksLoaderService.java @@ -2,10 +2,9 @@ package nu.marginalia.loading.links; import com.google.inject.Inject; import com.google.inject.Singleton; -import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; +import lombok.SneakyThrows; import nu.marginalia.io.processed.DomainLinkRecordParquetFileReader; -import nu.marginalia.io.processed.ProcessedDataFileNames; +import nu.marginalia.linkdb.DomainLinkDbWriter; import nu.marginalia.loading.LoaderInputData; import nu.marginalia.loading.domains.DomainIdRegistry; import nu.marginalia.model.processed.DomainLinkRecord; @@ -15,28 +14,22 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.nio.file.Path; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; @Singleton public class DomainLinksLoaderService { - private final HikariDataSource dataSource; private static final Logger logger = LoggerFactory.getLogger(DomainLinksLoaderService.class); - private final int nodeId; + + private final DomainLinkDbWriter domainLinkDbWriter; + @Inject - public DomainLinksLoaderService(HikariDataSource dataSource, - ProcessConfiguration processConfiguration) { - this.dataSource = dataSource; - this.nodeId = processConfiguration.node(); + public DomainLinksLoaderService(DomainLinkDbWriter domainLinkDbWriter) { + this.domainLinkDbWriter = domainLinkDbWriter; } public boolean loadLinks(DomainIdRegistry domainIdRegistry, ProcessHeartbeat heartbeat, - LoaderInputData inputData) throws IOException, SQLException { - - dropLinkData(); + LoaderInputData inputData) throws IOException { try (var task = heartbeat.createAdHocTaskHeartbeat("LINKS")) { var linkFiles = inputData.listDomainLinkFiles(); @@ -56,17 +49,7 @@ public class DomainLinksLoaderService { return true; } - private void dropLinkData() throws SQLException { - logger.info("Clearing EC_DOMAIN_LINK"); - - try (var conn = dataSource.getConnection(); - var call = conn.prepareCall("CALL PURGE_LINKS_TABLE(?)")) { - call.setInt(1, nodeId); - call.executeUpdate(); - } - } - - private void loadLinksFromFile(DomainIdRegistry domainIdRegistry, Path file) throws IOException, SQLException { + private void loadLinksFromFile(DomainIdRegistry domainIdRegistry, Path file) throws IOException { try (var domainStream = DomainLinkRecordParquetFileReader.stream(file); var linkLoader = new LinkLoader(domainIdRegistry)) { @@ -76,49 +59,21 @@ public class DomainLinksLoaderService { } class LinkLoader implements AutoCloseable { - private final Connection connection; - private final PreparedStatement insertStatement; private final DomainIdRegistry domainIdRegistry; - private int batchSize = 0; - private int total = 0; - - public LinkLoader(DomainIdRegistry domainIdRegistry) throws SQLException { + public LinkLoader(DomainIdRegistry domainIdRegistry) { this.domainIdRegistry = domainIdRegistry; - - connection = dataSource.getConnection(); - insertStatement = connection.prepareStatement(""" - INSERT IGNORE INTO EC_DOMAIN_LINK(SOURCE_DOMAIN_ID, DEST_DOMAIN_ID) - VALUES (?, ?) - """); } + @SneakyThrows void accept(DomainLinkRecord record) { - try { - insertStatement.setInt(1, domainIdRegistry.getDomainId(record.source)); - insertStatement.setInt(2, domainIdRegistry.getDomainId(record.dest)); - insertStatement.addBatch(); - if (++batchSize > 1000) { - batchSize = 0; - insertStatement.executeBatch(); - } - total++; - } - catch (SQLException ex) { - throw new RuntimeException(ex); - } + domainLinkDbWriter.write( + domainIdRegistry.getDomainId(record.source), + domainIdRegistry.getDomainId(record.dest) + ); } @Override - public void close() throws SQLException { - if (batchSize > 0) { - insertStatement.executeBatch(); - } - - logger.info("Inserted {} links", total); - - insertStatement.close(); - connection.close(); - } + public void close() {} } } diff --git a/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java b/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java deleted file mode 100644 index 9852b630..00000000 --- a/code/processes/loading-process/src/test/java/nu/marginalia/loading/links/DomainLinksLoaderServiceTest.java +++ /dev/null @@ -1,176 +0,0 @@ -package nu.marginalia.loading.links; - -import com.google.common.collect.Lists; -import com.zaxxer.hikari.HikariConfig; -import com.zaxxer.hikari.HikariDataSource; -import nu.marginalia.ProcessConfiguration; -import nu.marginalia.io.processed.DomainLinkRecordParquetFileWriter; -import nu.marginalia.io.processed.DomainRecordParquetFileWriter; -import nu.marginalia.io.processed.ProcessedDataFileNames; -import nu.marginalia.loader.DbTestUtil; -import nu.marginalia.loading.LoaderInputData; -import nu.marginalia.loading.domains.DomainLoaderService; -import nu.marginalia.model.processed.DomainLinkRecord; -import nu.marginalia.model.processed.DomainRecord; -import nu.marginalia.process.control.ProcessAdHocTaskHeartbeat; -import nu.marginalia.process.control.ProcessHeartbeat; -import org.junit.jupiter.api.*; -import org.mockito.Mockito; -import org.testcontainers.containers.MariaDBContainer; -import org.testcontainers.junit.jupiter.Container; -import org.testcontainers.junit.jupiter.Testcontainers; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.sql.SQLException; -import java.util.*; - -import static org.junit.jupiter.api.Assertions.assertEquals; - -@Tag("slow") -@Testcontainers -@Disabled // Error in the SQL loading mechanism, we don't deal with DELIMITER correctly - // which means we can't get around flyway's bugs necessitating DELIMITER. -class DomainLinksLoaderServiceTest { - List toDelete = new ArrayList<>(); - ProcessHeartbeat heartbeat; - - @Container - static MariaDBContainer mariaDBContainer = new MariaDBContainer<>("mariadb") - .withDatabaseName("WMSA_prod") - .withUsername("wmsa") - .withPassword("wmsa") - .withInitScript("db/migration/V23_06_0_000__base.sql") - .withNetworkAliases("mariadb"); - - HikariDataSource dataSource; - - @BeforeEach - public void setUp() { - - HikariConfig config = new HikariConfig(); - config.setJdbcUrl(mariaDBContainer.getJdbcUrl()); - config.setUsername("wmsa"); - config.setPassword("wmsa"); - - dataSource = new HikariDataSource(config); - - List migrations = List.of( - "db/migration/V23_11_0_007__domain_node_affinity.sql", - "db/migration/V23_11_0_008__purge_procedure.sql" - ); - for (String migration : migrations) { - try (var resource = Objects.requireNonNull(ClassLoader.getSystemResourceAsStream(migration), - "Could not load migration script " + migration); - var conn = dataSource.getConnection(); - var stmt = conn.createStatement() - ) { - String script = new String(resource.readAllBytes()); - String[] cmds = script.split("\\s*;\\s*"); - for (String cmd : cmds) { - if (cmd.isBlank()) - continue; - System.out.println(cmd); - stmt.executeUpdate(cmd); - } - } catch (IOException | SQLException ex) { - - } - } - - heartbeat = Mockito.mock(ProcessHeartbeat.class); - - Mockito.when(heartbeat.createAdHocTaskHeartbeat(Mockito.anyString())).thenReturn( - Mockito.mock(ProcessAdHocTaskHeartbeat.class) - ); - } - - @AfterEach - public void tearDown() throws IOException { - for (var path : Lists.reverse(toDelete)) { - Files.deleteIfExists(path); - } - - toDelete.clear(); - dataSource.close(); - } - - @Test - public void test() throws IOException, SQLException { - Path workDir = Files.createTempDirectory(getClass().getSimpleName()); - Path parquetFile1 = ProcessedDataFileNames.domainFileName(workDir, 0); - Path parquetFile2 = ProcessedDataFileNames.domainLinkFileName(workDir, 0); - Path parquetFile3 = ProcessedDataFileNames.domainLinkFileName(workDir, 1); - - toDelete.add(workDir); - toDelete.add(parquetFile1); - toDelete.add(parquetFile2); - toDelete.add(parquetFile3); - - List domains1 = List.of("www.marginalia.nu", "search.marginalia.nu"); - List linkDomains1 = List.of("wiby.me", "www.mojeek.com", "www.altavista.com"); - List linkDomains2 = List.of("maya.land", "xkcd.com", "aaronsw.com"); - - try (var pw = new DomainRecordParquetFileWriter(parquetFile1)) { - for (var domain : domains1) { - pw.write(dr(domain)); - } - } - try (var pw = new DomainLinkRecordParquetFileWriter(parquetFile2)) { - for (var domain : linkDomains1) { - pw.write(dl("www.marginalia.nu", domain)); - } - } - try (var pw = new DomainLinkRecordParquetFileWriter(parquetFile3)) { - for (var domain : linkDomains2) { - pw.write(dl("search.marginalia.nu", domain)); - } - } - - try (var dataSource = DbTestUtil.getConnection(mariaDBContainer.getJdbcUrl()); - var conn = dataSource.getConnection(); - var query = conn.prepareStatement(""" - SELECT SOURCE_DOMAIN_ID, DEST_DOMAIN_ID FROM EC_DOMAIN_LINK - """) - ) { - var domainService = new DomainLoaderService(dataSource, new ProcessConfiguration("test", 1, UUID.randomUUID())); - var input = new LoaderInputData(workDir, 2); - var domainRegistry = domainService.getOrCreateDomainIds(input); - - var dls = new DomainLinksLoaderService(dataSource, new ProcessConfiguration("test", 1, UUID.randomUUID())); - dls.loadLinks(domainRegistry, heartbeat, input); - - Map> expected = new HashMap<>(); - Map> actual = new HashMap<>(); - expected.put(domainRegistry.getDomainId("www.marginalia.nu"), new HashSet<>()); - expected.put(domainRegistry.getDomainId("search.marginalia.nu"), new HashSet<>()); - - for (var domain : linkDomains1) { - expected.get(domainRegistry.getDomainId("www.marginalia.nu")).add(domainRegistry.getDomainId(domain)); - } - for (var domain : linkDomains2) { - expected.get(domainRegistry.getDomainId("search.marginalia.nu")).add(domainRegistry.getDomainId(domain)); - } - - var rs = query.executeQuery(); - while (rs.next()) { - actual.computeIfAbsent(rs.getInt(1), k -> new HashSet<>()) - .add(rs.getInt(2)); - } - - assertEquals(expected, actual); - - } - - - } - - private DomainRecord dr(String domainName) { - return new DomainRecord(domainName, 0, 0, 0, null, null, null, null); - } - - private DomainLinkRecord dl(String sourceDomainName, String destDomainName) { - return new DomainLinkRecord(sourceDomainName, destDomainName); - } -} \ No newline at end of file diff --git a/code/processes/website-adjacencies-calculator/build.gradle b/code/processes/website-adjacencies-calculator/build.gradle index 94378fff..479c2744 100644 --- a/code/processes/website-adjacencies-calculator/build.gradle +++ b/code/processes/website-adjacencies-calculator/build.gradle @@ -21,7 +21,9 @@ dependencies { implementation project(':code:common:model') implementation project(':code:common:db') implementation project(':code:common:process') + implementation project(':code:common:service-client') implementation project(':code:common:service') + implementation project(':code:api:query-api') implementation libs.bundles.slf4j diff --git a/code/processes/website-adjacencies-calculator/src/main/java/nu/marginalia/adjacencies/AdjacenciesData.java b/code/processes/website-adjacencies-calculator/src/main/java/nu/marginalia/adjacencies/AdjacenciesData.java index 4ddc087b..61c2ceee 100644 --- a/code/processes/website-adjacencies-calculator/src/main/java/nu/marginalia/adjacencies/AdjacenciesData.java +++ b/code/processes/website-adjacencies-calculator/src/main/java/nu/marginalia/adjacencies/AdjacenciesData.java @@ -1,26 +1,25 @@ package nu.marginalia.adjacencies; -import com.zaxxer.hikari.HikariDataSource; import gnu.trove.list.TIntList; import gnu.trove.list.array.TIntArrayList; import gnu.trove.map.hash.TIntObjectHashMap; import gnu.trove.set.hash.TIntHashSet; +import nu.marginalia.query.client.QueryClient; import org.roaringbitmap.RoaringBitmap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import java.sql.ResultSet; -import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; public class AdjacenciesData { - TIntList idsList = new TIntArrayList(100_000); - ArrayList itemsList = new ArrayList<>(100_000); + private static final Logger logger = LoggerFactory.getLogger(AdjacenciesData.class); + private final TIntList idsList = new TIntArrayList(100_000); + private final ArrayList itemsList = new ArrayList<>(100_000); - TIntObjectHashMap dToSMap = new TIntObjectHashMap<>(100_000); - TIntObjectHashMap sToDMap = new TIntObjectHashMap<>(100_000); - - RoaringBitmap indexed = new RoaringBitmap(); + private final TIntObjectHashMap dToSMap = new TIntObjectHashMap<>(100_000); + private final TIntObjectHashMap sToDMap = new TIntObjectHashMap<>(100_000); public TIntHashSet getCandidates(SparseBitVector vec) { TIntHashSet ret = new TIntHashSet(); @@ -36,39 +35,31 @@ public class AdjacenciesData { return ret; } - public AdjacenciesData(HikariDataSource dataSource, DomainAliases aliases) throws SQLException { + public AdjacenciesData(QueryClient queryClient, + DomainAliases aliases) { + logger.info("Loading adjacency data"); Map tmpMapDtoS = new HashMap<>(100_000); - try ( - var conn = dataSource.getConnection(); - var indexedStmt = conn.prepareStatement("SELECT ID FROM EC_DOMAIN WHERE INDEXED>0"); - var linksStmt = conn.prepareStatement("SELECT SOURCE_DOMAIN_ID, DEST_DOMAIN_ID FROM EC_DOMAIN_LINK")) { - ResultSet rsp; - indexedStmt.setFetchSize(10_000); - rsp = indexedStmt.executeQuery(); - while (rsp.next()) { - indexed.add(rsp.getInt(1)); + int count = 0; + var allLinks = queryClient.getAllDomainLinks(); + for (var iter = allLinks.iterator();;count++) { + if (!iter.advance()) { + break; } + int source = aliases.deAlias(iter.source()); + int dest = aliases.deAlias(iter.dest()); - linksStmt.setFetchSize(10_000); - rsp = linksStmt.executeQuery(); - while (rsp.next()) { - int source = aliases.deAlias(rsp.getInt(1)); - int dest = aliases.deAlias(rsp.getInt(2)); - - tmpMapDtoS.computeIfAbsent(dest, this::createBitmapWithSelf).add(source); - - - RoaringBitmap sToDEntry = sToDMap.get(source); - if (sToDEntry == null) { - sToDEntry = new RoaringBitmap(); - sToDMap.put(source, sToDEntry); - sToDEntry.add(source); - } - sToDEntry.add(dest); + tmpMapDtoS.computeIfAbsent(dest, this::createBitmapWithSelf).add(source); + RoaringBitmap sToDEntry = sToDMap.get(source); + if (sToDEntry == null) { + sToDEntry = new RoaringBitmap(); + sToDMap.put(source, sToDEntry); + sToDEntry.add(source); } + sToDEntry.add(dest); } + logger.info("Links loaded: {}", count); tmpMapDtoS.entrySet().stream() .filter(e -> isEligible(e.getValue())) @@ -79,10 +70,10 @@ public class AdjacenciesData { dToSMap.put(e.getKey(), val); }); + logger.info("All adjacency dat loaded"); } private boolean isEligible(RoaringBitmap value) { -// return true; int cardinality = value.getCardinality(); return cardinality < 10000; @@ -95,10 +86,6 @@ public class AdjacenciesData { return bm; } - public boolean isIndexedDomain(int domainId) { - return indexed.contains(domainId); - } - public TIntList getIdsList() { return idsList; } diff --git a/code/processes/website-adjacencies-calculator/src/main/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java b/code/processes/website-adjacencies-calculator/src/main/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java index 258982df..fbda4856 100644 --- a/code/processes/website-adjacencies-calculator/src/main/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java +++ b/code/processes/website-adjacencies-calculator/src/main/java/nu/marginalia/adjacencies/WebsiteAdjacenciesCalculator.java @@ -7,7 +7,10 @@ import nu.marginalia.db.DbDomainQueries; import nu.marginalia.model.EdgeDomain; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeatImpl; +import nu.marginalia.query.client.QueryClient; import nu.marginalia.service.module.DatabaseModule; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.sql.SQLException; import java.util.*; @@ -23,13 +26,14 @@ public class WebsiteAdjacenciesCalculator { private final HikariDataSource dataSource; public AdjacenciesData adjacenciesData; public DomainAliases domainAliases; + private static final Logger logger = LoggerFactory.getLogger(WebsiteAdjacenciesCalculator.class); float[] weights; - public WebsiteAdjacenciesCalculator(HikariDataSource dataSource) throws SQLException { + public WebsiteAdjacenciesCalculator(QueryClient queryClient, HikariDataSource dataSource) throws SQLException { this.dataSource = dataSource; domainAliases = new DomainAliases(dataSource); - adjacenciesData = new AdjacenciesData(dataSource, domainAliases); + adjacenciesData = new AdjacenciesData(queryClient, domainAliases); weights = adjacenciesData.getWeights(); } @@ -47,7 +51,6 @@ public class WebsiteAdjacenciesCalculator { for (int domainId : domainIds) { findAdjacentDtoS(domainId, similarities -> { for (var similarity : similarities.similarities()) { - if (adjacenciesData.isIndexedDomain(similarity.domainId)) System.out.print("*"); System.out.println(dataStoreDao.getDomain(similarity.domainId).map(Object::toString).orElse("") + " " + prettyPercent(similarity.value)); } }); @@ -186,8 +189,9 @@ public class WebsiteAdjacenciesCalculator { DatabaseModule dm = new DatabaseModule(); var dataSource = dm.provideConnection(); + var qc = new QueryClient(); - var main = new WebsiteAdjacenciesCalculator(dataSource); + var main = new WebsiteAdjacenciesCalculator(qc, dataSource); if (args.length == 1 && "load".equals(args[0])) { var processHeartbeat = new ProcessHeartbeatImpl( @@ -195,9 +199,16 @@ public class WebsiteAdjacenciesCalculator { dataSource ); - processHeartbeat.start(); - main.loadAll(processHeartbeat); - processHeartbeat.shutDown(); + try { + processHeartbeat.start(); + main.loadAll(processHeartbeat); + } + catch (Exception ex) { + logger.error("Failed to load", ex); + } + finally { + processHeartbeat.shutDown(); + } return; } diff --git a/code/services-application/search-service/src/main/java/nu/marginalia/search/command/commands/BrowseCommand.java b/code/services-application/search-service/src/main/java/nu/marginalia/search/command/commands/BrowseCommand.java index 3099c29d..aa6b19ea 100644 --- a/code/services-application/search-service/src/main/java/nu/marginalia/search/command/commands/BrowseCommand.java +++ b/code/services-application/search-service/src/main/java/nu/marginalia/search/command/commands/BrowseCommand.java @@ -66,7 +66,7 @@ public class BrowseCommand implements SearchCommandInterface { return browseService.getRandomEntries(set); } else { - return browseService.getRelatedEntries(word); + return browseService.getRelatedEntries(ctx, word); } } catch (Exception ex) { diff --git a/code/services-application/search-service/src/main/java/nu/marginalia/search/results/BrowseResultCleaner.java b/code/services-application/search-service/src/main/java/nu/marginalia/search/results/BrowseResultCleaner.java index 71a6ad43..7ebefb55 100644 --- a/code/services-application/search-service/src/main/java/nu/marginalia/search/results/BrowseResultCleaner.java +++ b/code/services-application/search-service/src/main/java/nu/marginalia/search/results/BrowseResultCleaner.java @@ -2,6 +2,7 @@ package nu.marginalia.search.results; import com.google.inject.Inject; import com.google.inject.Singleton; +import nu.marginalia.assistant.client.model.SimilarDomain; import nu.marginalia.browse.model.BrowseResult; import nu.marginalia.screenshot.ScreenshotService; @@ -18,7 +19,7 @@ public class BrowseResultCleaner { this.screenshotService = screenshotService; } - public Predicate shouldRemoveResultPredicate() { + public Predicate shouldRemoveResultPredicateBr() { Set domainHashes = new HashSet<>(100); return (res) -> !screenshotService.hasScreenshot(res.domainId()) diff --git a/code/services-application/search-service/src/main/java/nu/marginalia/search/svc/SearchBrowseService.java b/code/services-application/search-service/src/main/java/nu/marginalia/search/svc/SearchBrowseService.java index 08423a4d..187a9081 100644 --- a/code/services-application/search-service/src/main/java/nu/marginalia/search/svc/SearchBrowseService.java +++ b/code/services-application/search-service/src/main/java/nu/marginalia/search/svc/SearchBrowseService.java @@ -1,16 +1,18 @@ package nu.marginalia.search.svc; import com.google.inject.Inject; +import nu.marginalia.assistant.client.AssistantClient; +import nu.marginalia.assistant.client.model.SimilarDomain; import nu.marginalia.browse.DbBrowseDomainsRandom; -import nu.marginalia.browse.DbBrowseDomainsSimilarCosine; -import nu.marginalia.browse.DbBrowseDomainsSimilarOldAlgo; import nu.marginalia.browse.model.BrowseResult; import nu.marginalia.browse.model.BrowseResultSet; +import nu.marginalia.client.Context; import nu.marginalia.db.DbDomainQueries; import nu.marginalia.db.DomainBlacklist; import nu.marginalia.model.EdgeDomain; import nu.marginalia.search.results.BrowseResultCleaner; +import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; @@ -19,55 +21,60 @@ import static java.util.Collections.shuffle; public class SearchBrowseService { private final DbBrowseDomainsRandom randomDomains; - private final DbBrowseDomainsSimilarCosine similarDomains; - private final DbBrowseDomainsSimilarOldAlgo similarDomainsOld; private final DbDomainQueries domainQueries; private final DomainBlacklist blacklist; + private final AssistantClient assistantClient; private final BrowseResultCleaner browseResultCleaner; @Inject public SearchBrowseService(DbBrowseDomainsRandom randomDomains, - DbBrowseDomainsSimilarCosine similarDomains, - DbBrowseDomainsSimilarOldAlgo similarDomainsOld, DbDomainQueries domainQueries, DomainBlacklist blacklist, + AssistantClient assistantClient, BrowseResultCleaner browseResultCleaner) { this.randomDomains = randomDomains; - this.similarDomains = similarDomains; - this.similarDomainsOld = similarDomainsOld; this.domainQueries = domainQueries; this.blacklist = blacklist; + this.assistantClient = assistantClient; this.browseResultCleaner = browseResultCleaner; } public BrowseResultSet getRandomEntries(int set) { List results = randomDomains.getRandomDomains(25, blacklist, set); - results.removeIf(browseResultCleaner.shouldRemoveResultPredicate()); + results.removeIf(browseResultCleaner.shouldRemoveResultPredicateBr()); return new BrowseResultSet(results); } - public BrowseResultSet getRelatedEntries(String word) { - var domain = domainQueries.getDomainId(new EdgeDomain(word)); + public BrowseResultSet getRelatedEntries(Context ctx, String domainName) { + var domain = domainQueries.getDomainId(new EdgeDomain(domainName)); - var neighbors = similarDomains.getDomainNeighborsAdjacentCosineRequireScreenshot(domain, blacklist, 256); - neighbors.removeIf(browseResultCleaner.shouldRemoveResultPredicate()); + var neighbors = assistantClient.similarDomains(ctx, domain, 50).blockingFirst(); + neighbors.removeIf(sd -> !sd.screenshot()); // If the results are very few, supplement with the alternative shitty algorithm if (neighbors.size() < 25) { - Set allNeighbors = new HashSet<>(neighbors); - allNeighbors.addAll(similarDomainsOld.getDomainNeighborsAdjacent(domain, blacklist, 50)); + Set allNeighbors = new HashSet<>(neighbors); + allNeighbors.addAll(assistantClient.linkedDomains(ctx, domain, 50).blockingFirst()); neighbors.clear(); neighbors.addAll(allNeighbors); - neighbors.removeIf(browseResultCleaner.shouldRemoveResultPredicate()); + neighbors.removeIf(sd -> !sd.screenshot()); } + List results = new ArrayList<>(neighbors.size()); + for (SimilarDomain sd : neighbors) { + var resultDomain = domainQueries.getDomain(sd.domainId()); + if (resultDomain.isEmpty()) + continue; + + results.add(new BrowseResult(resultDomain.get().toRootUrl(), sd.domainId(), 0, sd.screenshot())); + } // shuffle the items for a less repetitive experience shuffle(neighbors); - return new BrowseResultSet(neighbors, word); + return new BrowseResultSet(results, domainName); } } diff --git a/code/services-core/assistant-service/build.gradle b/code/services-core/assistant-service/build.gradle index 8609903d..950dc359 100644 --- a/code/services-core/assistant-service/build.gradle +++ b/code/services-core/assistant-service/build.gradle @@ -24,6 +24,7 @@ java { dependencies { implementation project(':third-party:symspell') implementation project(':code:api:assistant-api') + implementation project(':code:api:query-api') implementation project(':code:common:config') implementation project(':code:common:service') implementation project(':code:common:model') diff --git a/code/services-core/assistant-service/src/main/java/nu/marginalia/assistant/domains/DomainInformationService.java b/code/services-core/assistant-service/src/main/java/nu/marginalia/assistant/domains/DomainInformationService.java index 69c82bdd..b99c3abf 100644 --- a/code/services-core/assistant-service/src/main/java/nu/marginalia/assistant/domains/DomainInformationService.java +++ b/code/services-core/assistant-service/src/main/java/nu/marginalia/assistant/domains/DomainInformationService.java @@ -5,6 +5,7 @@ import nu.marginalia.geoip.GeoIpDictionary; import nu.marginalia.model.EdgeDomain; import nu.marginalia.db.DbDomainQueries; import nu.marginalia.assistant.client.model.DomainInformation; +import nu.marginalia.query.client.QueryClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -20,6 +21,7 @@ public class DomainInformationService { private final GeoIpDictionary geoIpDictionary; private DbDomainQueries dbDomainQueries; + private final QueryClient queryClient; private HikariDataSource dataSource; private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -27,9 +29,11 @@ public class DomainInformationService { public DomainInformationService( DbDomainQueries dbDomainQueries, GeoIpDictionary geoIpDictionary, + QueryClient queryClient, HikariDataSource dataSource) { this.dbDomainQueries = dbDomainQueries; this.geoIpDictionary = geoIpDictionary; + this.queryClient = queryClient; this.dataSource = dataSource; } @@ -80,21 +84,8 @@ public class DomainInformationService { inCrawlQueue = rs.next(); builder.inCrawlQueue(inCrawlQueue); - rs = stmt.executeQuery(STR.""" - SELECT COUNT(ID) FROM EC_DOMAIN_LINK WHERE DEST_DOMAIN_ID=\{domainId} - """); - if (rs.next()) { - builder.incomingLinks(rs.getInt(1)); - } - - rs = stmt.executeQuery(STR.""" - SELECT COUNT(ID) FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=\{domainId} - """); - if (rs.next()) { - builder.outboundLinks(rs.getInt(1)); - outboundLinks = rs.getInt(1); - } - + builder.incomingLinks(queryClient.countLinksToDomain(domainId)); + builder.outboundLinks(queryClient.countLinksFromDomain(domainId)); rs = stmt.executeQuery(STR.""" SELECT KNOWN_URLS, GOOD_URLS, VISITED_URLS FROM DOMAIN_METADATA WHERE ID=\{domainId} diff --git a/code/services-core/assistant-service/src/main/java/nu/marginalia/assistant/domains/SimilarDomainsService.java b/code/services-core/assistant-service/src/main/java/nu/marginalia/assistant/domains/SimilarDomainsService.java index e409e7a2..ddcc2e98 100644 --- a/code/services-core/assistant-service/src/main/java/nu/marginalia/assistant/domains/SimilarDomainsService.java +++ b/code/services-core/assistant-service/src/main/java/nu/marginalia/assistant/domains/SimilarDomainsService.java @@ -10,6 +10,7 @@ import gnu.trove.set.TIntSet; import gnu.trove.set.hash.TIntHashSet; import nu.marginalia.assistant.client.model.SimilarDomain; import nu.marginalia.model.EdgeDomain; +import nu.marginalia.query.client.QueryClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,14 +26,13 @@ public class SimilarDomainsService { private static final Logger logger = LoggerFactory.getLogger(SimilarDomainsService.class); private final HikariDataSource dataSource; + private final QueryClient queryClient; private volatile TIntIntHashMap domainIdToIdx = new TIntIntHashMap(100_000); private volatile int[] domainIdxToId; public volatile TIntDoubleHashMap[] relatedDomains; public volatile TIntList[] domainNeighbors = null; - public volatile TIntList[] linkStoD = null; - public volatile TIntList[] linkDtoS = null; public volatile BitSet screenshotDomains = null; public volatile BitSet activeDomains = null; public volatile BitSet indexedDomains = null; @@ -42,8 +42,9 @@ public class SimilarDomainsService { volatile boolean isReady = false; @Inject - public SimilarDomainsService(HikariDataSource dataSource) { + public SimilarDomainsService(HikariDataSource dataSource, QueryClient queryClient) { this.dataSource = dataSource; + this.queryClient = queryClient; Executors.newSingleThreadExecutor().submit(this::init); } @@ -70,8 +71,6 @@ public class SimilarDomainsService { domainRanks = new double[domainIdToIdx.size()]; domainNames = new String[domainIdToIdx.size()]; domainNeighbors = new TIntList[domainIdToIdx.size()]; - linkStoD = new TIntList[domainIdToIdx.size()]; - linkDtoS = new TIntList[domainIdToIdx.size()]; screenshotDomains = new BitSet(domainIdToIdx.size()); activeDomains = new BitSet(domainIdToIdx.size()); indexedDomains = new BitSet(domainIdToIdx.size()); @@ -108,27 +107,6 @@ public class SimilarDomainsService { logger.info("Loaded {} related domains", relatedDomains.length); - rs = stmt.executeQuery(""" - SELECT SOURCE_DOMAIN_ID, DEST_DOMAIN_ID FROM EC_DOMAIN_LINK - """); - - while (rs.next()) { - int source = rs.getInt(1); - int dest = rs.getInt(2); - - int sourceIdx = domainIdToIdx.get(source); - int destIdx = domainIdToIdx.get(dest); - - if (linkStoD[sourceIdx] == null) - linkStoD[sourceIdx] = new TIntArrayList(32); - if (linkDtoS[destIdx] == null) - linkDtoS[destIdx] = new TIntArrayList(32); - - linkStoD[sourceIdx].add(destIdx); - linkDtoS[destIdx].add(sourceIdx); - - } - logger.info("Loaded links..."); rs = stmt.executeQuery(""" SELECT EC_DOMAIN.ID, @@ -167,7 +145,6 @@ public class SimilarDomainsService { } logger.info("Loaded {} domains", domainRanks.length); - logger.info("All done!"); isReady = true; } } @@ -272,17 +249,23 @@ public class SimilarDomainsService { } private TIntSet getLinkingIdsDToS(int domainIdx) { - var items = linkDtoS[domainIdx]; - if (items == null) - return new TIntHashSet(); - return new TIntHashSet(items); + var items = new TIntHashSet(); + + for (int id : queryClient.getLinksFromDomain(domainIdxToId[domainIdx])) { + items.add(domainIdToIdx.get(id)); + } + + return items; } private TIntSet getLinkingIdsSToD(int domainIdx) { - var items = linkStoD[domainIdx]; - if (items == null) - return new TIntHashSet(); - return new TIntHashSet(items); + var items = new TIntHashSet(); + + for (int id : queryClient.getLinksToDomain(domainIdxToId[domainIdx])) { + items.add(domainIdToIdx.get(id)); + } + + return items; } public List getLinkingDomains(int domainId, int count) { diff --git a/code/services-core/executor-service/build.gradle b/code/services-core/executor-service/build.gradle index fa69c62d..8d695415 100644 --- a/code/services-core/executor-service/build.gradle +++ b/code/services-core/executor-service/build.gradle @@ -26,6 +26,7 @@ dependencies { implementation project(':code:common:model') implementation project(':code:common:process') implementation project(':code:common:db') + implementation project(':code:common:linkdb') implementation project(':code:common:service') implementation project(':code:common:service-client') diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ExportDataActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ExportDataActor.java index 91add3d5..4c41da78 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ExportDataActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ExportDataActor.java @@ -4,9 +4,6 @@ import com.google.gson.Gson; import com.google.inject.Inject; import com.google.inject.Singleton; import com.zaxxer.hikari.HikariDataSource; -import lombok.AllArgsConstructor; -import lombok.NoArgsConstructor; -import lombok.With; import nu.marginalia.actor.prototype.RecordActorPrototype; import nu.marginalia.actor.state.ActorStep; import nu.marginalia.storage.FileStorageService; diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/svc/BackupService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/svc/BackupService.java index e78c5e2f..2205e5e2 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/svc/BackupService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/svc/BackupService.java @@ -19,6 +19,9 @@ import java.sql.SQLException; import java.time.LocalDateTime; import java.util.List; +import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME; +import static nu.marginalia.linkdb.LinkdbFileNames.DOMAIN_LINKS_FILE_NAME; + public class BackupService { private final FileStorageService storageService; @@ -26,6 +29,7 @@ public class BackupService { public enum BackupHeartbeatSteps { LINKS, + DOCS, JOURNAL, DONE } @@ -57,8 +61,11 @@ public class BackupService { try (var heartbeat = serviceHeartbeat.createServiceTaskHeartbeat(BackupHeartbeatSteps.class, "Backup")) { + heartbeat.progress(BackupHeartbeatSteps.DOCS); + backupFileCompressed(DOCDB_FILE_NAME, linkdbStagingStorage, backupStorage.asPath()); + heartbeat.progress(BackupHeartbeatSteps.LINKS); - backupFileCompressed("links.db", linkdbStagingStorage, backupStorage.asPath()); + backupFileCompressed(DOMAIN_LINKS_FILE_NAME, linkdbStagingStorage, backupStorage.asPath()); heartbeat.progress(BackupHeartbeatSteps.JOURNAL); // This file format is already compressed @@ -79,8 +86,11 @@ public class BackupService { var linkdbStagingStorage = IndexLocations.getLinkdbWritePath(storageService); try (var heartbeat = serviceHeartbeat.createServiceTaskHeartbeat(BackupHeartbeatSteps.class, "Restore Backup")) { + heartbeat.progress(BackupHeartbeatSteps.DOCS); + restoreBackupCompressed(DOCDB_FILE_NAME, linkdbStagingStorage, backupStorage); + heartbeat.progress(BackupHeartbeatSteps.LINKS); - restoreBackupCompressed("links.db", linkdbStagingStorage, backupStorage); + restoreBackupCompressed(DOMAIN_LINKS_FILE_NAME, linkdbStagingStorage, backupStorage); heartbeat.progress(BackupHeartbeatSteps.JOURNAL); restoreJournal(indexStagingStorage, backupStorage); diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexModule.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexModule.java index f3b409d9..179df9ec 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexModule.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexModule.java @@ -4,17 +4,27 @@ import com.google.inject.AbstractModule; import com.google.inject.Provides; import com.google.inject.Singleton; import com.google.inject.name.Named; +import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.linkdb.DomainLinkDb; +import nu.marginalia.linkdb.FileDomainLinkDb; +import nu.marginalia.linkdb.SqlDomainLinkDb; +import nu.marginalia.service.module.ServiceConfiguration; import nu.marginalia.storage.FileStorageService; import nu.marginalia.IndexLocations; import nu.marginalia.index.config.RankingSettings; import nu.marginalia.WmsaHome; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.file.Files; import java.nio.file.Path; import java.sql.SQLException; +import static nu.marginalia.linkdb.LinkdbFileNames.*; + public class IndexModule extends AbstractModule { - - + private static final Logger logger = LoggerFactory.getLogger(IndexModule.class); public void configure() { } @@ -25,11 +35,49 @@ public class IndexModule extends AbstractModule { return RankingSettings.from(dir); } + @Provides + @Singleton + public DomainLinkDb domainLinkDb ( + FileStorageService storageService, + HikariDataSource dataSource, + ServiceConfiguration serviceConfiguration + ) throws IOException + { + Path path = IndexLocations.getLinkdbLivePath(storageService).resolve(DOMAIN_LINKS_FILE_NAME); + + if (Files.exists(path)) { + logger.info("Using file domain link db {}", path); + return new FileDomainLinkDb(path); + } + else { + logger.warn("Using legacy sql domain link db"); + return new SqlDomainLinkDb(path, dataSource, serviceConfiguration); + } + } @Provides @Singleton - @Named("linkdb-file") - public Path linkdbPath(FileStorageService storageService) throws SQLException { - return IndexLocations.getLinkdbLivePath(storageService).resolve("links.db"); + @Named("docdb-file") + public Path linkdbPath(FileStorageService storageService) throws IOException { + // Migrate from old location + Path migrationMarker = IndexLocations.getLinkdbLivePath(storageService).resolve("migrated-links.db-to-documents.db"); + Path oldPath = IndexLocations.getLinkdbLivePath(storageService).resolve(DEPRECATED_LINKDB_FILE_NAME); + Path newPath = IndexLocations.getLinkdbLivePath(storageService).resolve(DOCDB_FILE_NAME); + + if (Files.exists(oldPath) && !Files.exists(newPath) && !Files.exists(migrationMarker)) { + logger.info("Migrating {} to {}", oldPath, newPath); + + Files.move(oldPath, newPath); + Files.createFile(migrationMarker); + } + + return newPath; + } + + @Provides + @Singleton + @Named("domain-linkdb-file") + public Path domainLinkDbFile(FileStorageService storageService) throws SQLException { + return IndexLocations.getLinkdbLivePath(storageService).resolve(DOMAIN_LINKS_FILE_NAME); } } diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java index c1027ad9..9602b469 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/IndexService.java @@ -6,12 +6,14 @@ import io.grpc.ServerBuilder; import io.reactivex.rxjava3.schedulers.Schedulers; import lombok.SneakyThrows; import nu.marginalia.IndexLocations; +import nu.marginalia.index.svc.IndexDomainLinksService; +import nu.marginalia.linkdb.DomainLinkDb; import nu.marginalia.storage.FileStorageService; import nu.marginalia.index.client.IndexMqEndpoints; import nu.marginalia.index.index.SearchIndex; import nu.marginalia.index.svc.IndexOpsService; import nu.marginalia.index.svc.IndexQueryService; -import nu.marginalia.linkdb.LinkdbReader; +import nu.marginalia.linkdb.DocumentDbReader; import nu.marginalia.model.gson.GsonFactory; import nu.marginalia.service.control.ServiceEventLog; import nu.marginalia.service.server.*; @@ -28,6 +30,8 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.concurrent.TimeUnit; +import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME; +import static nu.marginalia.linkdb.LinkdbFileNames.DOMAIN_LINKS_FILE_NAME; import static spark.Spark.get; public class IndexService extends Service { @@ -38,8 +42,9 @@ public class IndexService extends Service { private final IndexOpsService opsService; private final SearchIndex searchIndex; private final FileStorageService fileStorageService; - private final LinkdbReader linkdbReader; + private final DocumentDbReader documentDbReader; + private final DomainLinkDb domainLinkDb; private final ServiceEventLog eventLog; @@ -49,14 +54,17 @@ public class IndexService extends Service { IndexQueryService indexQueryService, SearchIndex searchIndex, FileStorageService fileStorageService, - LinkdbReader linkdbReader, + DocumentDbReader documentDbReader, + DomainLinkDb domainLinkDb, + IndexDomainLinksService indexDomainLinksService, ServiceEventLog eventLog) throws IOException { super(params); this.opsService = opsService; this.searchIndex = searchIndex; this.fileStorageService = fileStorageService; - this.linkdbReader = linkdbReader; + this.documentDbReader = documentDbReader; + this.domainLinkDb = domainLinkDb; this.eventLog = eventLog; final Gson gson = GsonFactory.get(); @@ -65,6 +73,7 @@ public class IndexService extends Service { var grpcServer = ServerBuilder.forPort(params.configuration.port() + 1) .addService(indexQueryService) + .addService(indexDomainLinksService) .build(); grpcServer.start(); @@ -99,15 +108,24 @@ public class IndexService extends Service { @SneakyThrows @MqRequest(endpoint = IndexMqEndpoints.SWITCH_LINKDB) public void switchLinkdb(String unusedArg) { - logger.info("Switching link database"); + logger.info("Switching link databases"); - Path newPath = IndexLocations + Path newPathDocs = IndexLocations .getLinkdbWritePath(fileStorageService) - .resolve("links.db"); + .resolve(DOCDB_FILE_NAME); - if (Files.exists(newPath)) { - eventLog.logEvent("INDEX-SWITCH-LINKDB", ""); - linkdbReader.switchInput(newPath); + if (Files.exists(newPathDocs)) { + eventLog.logEvent("INDEX-SWITCH-DOCKDB", ""); + documentDbReader.switchInput(newPathDocs); + } + + Path newPathDomains = IndexLocations + .getLinkdbWritePath(fileStorageService) + .resolve(DOMAIN_LINKS_FILE_NAME); + + if (Files.exists(newPathDomains)) { + eventLog.logEvent("INDEX-SWITCH-DOMAIN-LINKDB", ""); + domainLinkDb.switchInput(newPathDomains); } } diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/results/IndexResultDecorator.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/results/IndexResultDecorator.java index 376972b8..0994fcbc 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/results/IndexResultDecorator.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/results/IndexResultDecorator.java @@ -7,8 +7,8 @@ import gnu.trove.list.array.TLongArrayList; import nu.marginalia.index.client.model.results.DecoratedSearchResultItem; import nu.marginalia.index.client.model.results.ResultRankingContext; import nu.marginalia.index.client.model.results.SearchResultItem; -import nu.marginalia.linkdb.LinkdbReader; -import nu.marginalia.linkdb.model.LdbUrlDetail; +import nu.marginalia.linkdb.DocumentDbReader; +import nu.marginalia.linkdb.model.DocdbUrlDetail; import nu.marginalia.ranking.ResultValuator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,13 +25,13 @@ public class IndexResultDecorator { private static final Logger logger = LoggerFactory.getLogger(IndexResultDecorator.class); - private final LinkdbReader linkdbReader; + private final DocumentDbReader documentDbReader; private final ResultValuator valuator; @Inject - public IndexResultDecorator(LinkdbReader linkdbReader, + public IndexResultDecorator(DocumentDbReader documentDbReader, ResultValuator valuator) { - this.linkdbReader = linkdbReader; + this.documentDbReader = documentDbReader; this.valuator = valuator; } @@ -46,9 +46,9 @@ public class IndexResultDecorator { for (var result : rawResults) idsList.add(result.getDocumentId()); - Map urlDetailsById = new HashMap<>(rawResults.size()); + Map urlDetailsById = new HashMap<>(rawResults.size()); - for (var item : linkdbReader.getUrlDetails(idsList)) + for (var item : documentDbReader.getUrlDetails(idsList)) urlDetailsById.put(item.urlId(), item); List decoratedItems = new ArrayList<>(); @@ -63,7 +63,7 @@ public class IndexResultDecorator { } private DecoratedSearchResultItem createCombinedItem(SearchResultItem result, - LdbUrlDetail linkData, + DocdbUrlDetail linkData, ResultRankingContext rankingContext) { return new DecoratedSearchResultItem( result, diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexDomainLinksService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexDomainLinksService.java new file mode 100644 index 00000000..04b33e6c --- /dev/null +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexDomainLinksService.java @@ -0,0 +1,104 @@ +package nu.marginalia.index.svc; + +import com.google.inject.Inject; +import io.grpc.stub.StreamObserver; +import nu.marginalia.index.api.*; +import nu.marginalia.linkdb.DomainLinkDb; + +import static io.grpc.stub.ServerCalls.asyncUnimplementedUnaryCall; + +/** GRPC service for interrogating domain links + */ +public class IndexDomainLinksService extends IndexDomainLinksApiGrpc.IndexDomainLinksApiImplBase { + private final DomainLinkDb domainLinkDb; + + @Inject + public IndexDomainLinksService(DomainLinkDb domainLinkDb) { + this.domainLinkDb = domainLinkDb; + } + + public void getAllLinks(nu.marginalia.index.api.Empty request, + io.grpc.stub.StreamObserver responseObserver) { + + try (var idsConverter = new AllIdsResponseConverter(responseObserver)) { + domainLinkDb.forEach(idsConverter::accept); + } + + responseObserver.onCompleted(); + } + + private static class AllIdsResponseConverter implements AutoCloseable { + private RpcDomainIdPairs.Builder builder; + private final io.grpc.stub.StreamObserver responseObserver; + private int n = 0; + + private AllIdsResponseConverter(io.grpc.stub.StreamObserver responseObserver) { + this.responseObserver = responseObserver; + this.builder = RpcDomainIdPairs.newBuilder(); + } + + public void accept(int source, int dest) { + builder.addSourceIds(source); + builder.addDestIds(dest); + + if (++n > 1000) { + responseObserver.onNext(builder.build()); + builder = RpcDomainIdPairs.newBuilder(); + n = 0; + } + } + + @Override + public void close() { + if (n > 0) { + responseObserver.onNext(builder.build()); + } + } + } + + @Override + public void getLinksFromDomain(RpcDomainId request, + StreamObserver responseObserver) { + + var links = domainLinkDb.findDestinations(request.getDomainId()); + + var rspBuilder = RpcDomainIdList.newBuilder(); + for (int i = 0; i < links.size(); i++) { + rspBuilder.addDomainId(links.get(i)); + } + responseObserver.onNext(rspBuilder.build()); + + responseObserver.onCompleted(); + } + + @Override + public void getLinksToDomain(RpcDomainId request, + StreamObserver responseObserver) { + + var links = domainLinkDb.findSources(request.getDomainId()); + + var rspBuilder = RpcDomainIdList.newBuilder(); + for (int i = 0; i < links.size(); i++) { + rspBuilder.addDomainId(links.get(i)); + } + responseObserver.onNext(rspBuilder.build()); + + responseObserver.onCompleted(); + } + + public void countLinksFromDomain(RpcDomainId request, + StreamObserver responseObserver) { + responseObserver.onNext(RpcDomainIdCount.newBuilder() + .setIdCount(domainLinkDb.countDestinations(request.getDomainId())) + .build()); + responseObserver.onCompleted(); + } + + public void countLinksToDomain(RpcDomainId request, + StreamObserver responseObserver) { + responseObserver.onNext(RpcDomainIdCount.newBuilder() + .setIdCount(domainLinkDb.countSources(request.getDomainId())) + .build()); + responseObserver.onCompleted(); + } +} diff --git a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java index 397e124e..37d52f2e 100644 --- a/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java +++ b/code/services-core/index-service/src/main/java/nu/marginalia/index/svc/IndexOpsService.java @@ -59,6 +59,7 @@ public class IndexOpsService { public Optional run(Callable c) throws Exception { if (!opsLock.tryLock()) return Optional.empty(); + try { return Optional.of(c.call()); } diff --git a/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationSmokeTest.java b/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationSmokeTest.java index 2066d59d..cd69188c 100644 --- a/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationSmokeTest.java +++ b/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationSmokeTest.java @@ -24,9 +24,9 @@ import nu.marginalia.index.journal.writer.IndexJournalWriter; import nu.marginalia.index.query.limit.QueryLimits; import nu.marginalia.index.query.limit.QueryStrategy; import nu.marginalia.index.query.limit.SpecificationLimit; -import nu.marginalia.linkdb.LinkdbReader; -import nu.marginalia.linkdb.LinkdbWriter; -import nu.marginalia.linkdb.model.LdbUrlDetail; +import nu.marginalia.linkdb.DocumentDbReader; +import nu.marginalia.linkdb.DocumentDbWriter; +import nu.marginalia.linkdb.model.DocdbUrlDetail; import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.id.UrlIdCodec; import nu.marginalia.model.idx.WordFlags; @@ -51,6 +51,7 @@ import java.sql.SQLException; import java.util.*; import java.util.stream.IntStream; +import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; @@ -80,7 +81,7 @@ public class IndexQueryServiceIntegrationSmokeTest { DomainRankings domainRankings; @Inject - LinkdbReader linkdbReader; + DocumentDbReader documentDbReader; @Inject ProcessHeartbeat processHeartbeat; @@ -103,15 +104,15 @@ public class IndexQueryServiceIntegrationSmokeTest { @Test public void willItBlend() throws Exception { - var linkdbWriter = new LinkdbWriter( + var linkdbWriter = new DocumentDbWriter( IndexLocations.getLinkdbLivePath(fileStorageService) - .resolve("links.db") + .resolve(DOCDB_FILE_NAME) ); for (int i = 1; i < 512; i++) { loadData(linkdbWriter, i); } linkdbWriter.close(); - linkdbReader.reconnect(); + documentDbReader.reconnect(); indexJournalWriter.close(); constructIndex(); @@ -146,15 +147,15 @@ public class IndexQueryServiceIntegrationSmokeTest { @Test public void testDomainQuery() throws Exception { - var linkdbWriter = new LinkdbWriter( + var linkdbWriter = new DocumentDbWriter( IndexLocations.getLinkdbLivePath(fileStorageService) - .resolve("links.db") + .resolve(DOCDB_FILE_NAME) ); for (int i = 1; i < 512; i++) { loadDataWithDomain(linkdbWriter, i/100, i); } linkdbWriter.close(); - linkdbReader.reconnect(); + documentDbReader.reconnect(); indexJournalWriter.close(); constructIndex(); @@ -183,15 +184,15 @@ public class IndexQueryServiceIntegrationSmokeTest { @Test public void testYearQuery() throws Exception { - var linkdbWriter = new LinkdbWriter( + var linkdbWriter = new DocumentDbWriter( IndexLocations.getLinkdbLivePath(fileStorageService) - .resolve("links.db") + .resolve(DOCDB_FILE_NAME) ); for (int i = 1; i < 512; i++) { loadData(linkdbWriter, i); } linkdbWriter.close(); - linkdbReader.reconnect(); + documentDbReader.reconnect(); indexJournalWriter.close(); constructIndex(); @@ -283,7 +284,7 @@ public class IndexQueryServiceIntegrationSmokeTest { MurmurHash3_128 hasher = new MurmurHash3_128(); @SneakyThrows - public void loadData(LinkdbWriter ldbw, int id) { + public void loadData(DocumentDbWriter ldbw, int id) { int[] factors = IntStream .rangeClosed(1, id) .filter(v -> (id % v) == 0) @@ -299,7 +300,7 @@ public class IndexQueryServiceIntegrationSmokeTest { data[2 * i + 1] = new WordMetadata(i, EnumSet.of(WordFlags.Title)).encode(); } - ldbw.add(new LdbUrlDetail( + ldbw.add(new DocdbUrlDetail( fullId, new EdgeUrl("https://www.example.com/"+id), "test", "test", 0., "HTML5", 0, null, 0, 10 )); @@ -308,7 +309,7 @@ public class IndexQueryServiceIntegrationSmokeTest { } @SneakyThrows - public void loadDataWithDomain(LinkdbWriter ldbw, int domain, int id) { + public void loadDataWithDomain(DocumentDbWriter ldbw, int domain, int id) { int[] factors = IntStream.rangeClosed(1, id).filter(v -> (id % v) == 0).toArray(); long fullId = UrlIdCodec.encodeId(domain, id); var header = new IndexJournalEntryHeader(factors.length, 0, fullId, DocumentMetadata.defaultValue()); @@ -319,7 +320,7 @@ public class IndexQueryServiceIntegrationSmokeTest { data[2*i + 1] = new WordMetadata(i, EnumSet.of(WordFlags.Title)).encode(); } - ldbw.add(new LdbUrlDetail( + ldbw.add(new DocdbUrlDetail( fullId, new EdgeUrl("https://www.example.com/"+id), "test", "test", 0., "HTML5", 0, null, 0, 10 )); diff --git a/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationTest.java b/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationTest.java index 8224101a..17acc7c4 100644 --- a/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationTest.java +++ b/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationTest.java @@ -23,9 +23,9 @@ import nu.marginalia.index.journal.writer.IndexJournalWriter; import nu.marginalia.index.query.limit.QueryLimits; import nu.marginalia.index.query.limit.QueryStrategy; import nu.marginalia.index.query.limit.SpecificationLimit; -import nu.marginalia.linkdb.LinkdbReader; -import nu.marginalia.linkdb.LinkdbWriter; -import nu.marginalia.linkdb.model.LdbUrlDetail; +import nu.marginalia.linkdb.DocumentDbReader; +import nu.marginalia.linkdb.DocumentDbWriter; +import nu.marginalia.linkdb.model.DocdbUrlDetail; import nu.marginalia.model.EdgeUrl; import nu.marginalia.model.crawl.PubDate; import nu.marginalia.model.id.UrlIdCodec; @@ -53,6 +53,7 @@ import java.sql.SQLException; import java.util.*; import java.util.function.Function; +import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD; @@ -84,7 +85,7 @@ public class IndexQueryServiceIntegrationTest { @Inject ProcessHeartbeat processHeartbeat; @Inject - LinkdbReader linkdbReader; + DocumentDbReader documentDbReader; @BeforeEach public void setUp() throws IOException { @@ -566,11 +567,11 @@ public class IndexQueryServiceIntegrationTest { indexJournalWriter.put(header, entry); }); - var linkdbWriter = new LinkdbWriter( - IndexLocations.getLinkdbLivePath(fileStorageService).resolve("links.db") + var linkdbWriter = new DocumentDbWriter( + IndexLocations.getLinkdbLivePath(fileStorageService).resolve(DOCDB_FILE_NAME) ); for (Long key : allData.keySet()) { - linkdbWriter.add(new LdbUrlDetail( + linkdbWriter.add(new DocdbUrlDetail( key, new EdgeUrl("https://www.example.com"), "test", @@ -587,7 +588,7 @@ public class IndexQueryServiceIntegrationTest { indexJournalWriter.close(); constructIndex(); - linkdbReader.reconnect(); + documentDbReader.reconnect(); searchIndex.switchIndex(); } } diff --git a/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationTestModule.java b/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationTestModule.java index d128a690..79e722a0 100644 --- a/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationTestModule.java +++ b/code/services-core/index-service/src/test/java/nu/marginalia/index/svc/IndexQueryServiceIntegrationTestModule.java @@ -7,7 +7,7 @@ import nu.marginalia.storage.model.FileStorageBase; import nu.marginalia.storage.model.FileStorageBaseType; import nu.marginalia.index.journal.writer.IndexJournalWriter; import nu.marginalia.index.journal.writer.IndexJournalWriterPagingImpl; -import nu.marginalia.linkdb.LinkdbReader; +import nu.marginalia.linkdb.DocumentDbReader; import nu.marginalia.process.control.FakeProcessHeartbeat; import nu.marginalia.process.control.ProcessHeartbeat; import nu.marginalia.ranking.DomainRankings; @@ -26,6 +26,7 @@ import java.sql.SQLException; import java.util.Random; import java.util.UUID; +import static nu.marginalia.linkdb.LinkdbFileNames.DOCDB_FILE_NAME; import static org.mockito.Mockito.when; public class IndexQueryServiceIntegrationTestModule extends AbstractModule { @@ -57,9 +58,9 @@ public class IndexQueryServiceIntegrationTestModule extends AbstractModule { Mockito.when(fileStorageServiceMock.getStorageBase(FileStorageBaseType.CURRENT)).thenReturn(new FileStorageBase(null, null, 0,null, fastDir.toString())); Mockito.when(fileStorageServiceMock.getStorageBase(FileStorageBaseType.STORAGE)).thenReturn(new FileStorageBase(null, null, 0, null, fastDir.toString())); - bind(LinkdbReader.class).toInstance(new LinkdbReader( + bind(DocumentDbReader.class).toInstance(new DocumentDbReader( IndexLocations.getLinkdbLivePath(fileStorageServiceMock) - .resolve("links.db") + .resolve(DOCDB_FILE_NAME) )); bind(FileStorageService.class).toInstance(fileStorageServiceMock); diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCDomainLinksService.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCDomainLinksService.java new file mode 100644 index 00000000..78cfc637 --- /dev/null +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCDomainLinksService.java @@ -0,0 +1,96 @@ +package nu.marginalia.query; + +import com.google.inject.Inject; +import io.grpc.ManagedChannel; +import io.grpc.stub.StreamObserver; +import nu.marginalia.index.api.IndexDomainLinksApiGrpc; +import nu.marginalia.index.api.RpcDomainIdCount; +import nu.marginalia.index.api.RpcDomainIdList; +import nu.marginalia.index.api.RpcDomainIdPairs; +import nu.marginalia.query.svc.NodeConfigurationWatcher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class QueryGRPCDomainLinksService extends IndexDomainLinksApiGrpc.IndexDomainLinksApiImplBase { + private static final Logger logger = LoggerFactory.getLogger(QueryGRPCDomainLinksService.class); + private final NodeConfigurationWatcher nodeConfigurationWatcher; + private final QueryGrpcStubPool stubPool; + + @Inject + public QueryGRPCDomainLinksService(NodeConfigurationWatcher nodeConfigurationWatcher) { + this.nodeConfigurationWatcher = nodeConfigurationWatcher; + stubPool = new QueryGrpcStubPool<>(nodeConfigurationWatcher) { + @Override + public IndexDomainLinksApiGrpc.IndexDomainLinksApiBlockingStub createStub(ManagedChannel channel) { + return IndexDomainLinksApiGrpc.newBlockingStub(channel); + } + }; + } + + @Override + public void getAllLinks(nu.marginalia.index.api.Empty request, + StreamObserver responseObserver) { + stubPool.callEachSequential(stub -> stub.getAllLinks(request)) + .forEach( + iter -> iter.forEachRemaining(responseObserver::onNext) + ); + + responseObserver.onCompleted(); + } + + @Override + public void getLinksFromDomain(nu.marginalia.index.api.RpcDomainId request, + StreamObserver responseObserver) { + var rspBuilder = RpcDomainIdList.newBuilder(); + + stubPool.callEachSequential(stub -> stub.getLinksFromDomain(request)) + .map(RpcDomainIdList::getDomainIdList) + .forEach(rspBuilder::addAllDomainId); + + responseObserver.onNext(rspBuilder.build()); + responseObserver.onCompleted(); + } + + @Override + public void getLinksToDomain(nu.marginalia.index.api.RpcDomainId request, + StreamObserver responseObserver) { + var rspBuilder = RpcDomainIdList.newBuilder(); + + stubPool.callEachSequential(stub -> stub.getLinksToDomain(request)) + .map(RpcDomainIdList::getDomainIdList) + .forEach(rspBuilder::addAllDomainId); + + responseObserver.onNext(rspBuilder.build()); + responseObserver.onCompleted(); + } + + @Override + public void countLinksFromDomain(nu.marginalia.index.api.RpcDomainId request, + StreamObserver responseObserver) { + + int sum = stubPool.callEachSequential(stub -> stub.countLinksFromDomain(request)) + .mapToInt(RpcDomainIdCount::getIdCount) + .sum(); + + var rspBuilder = RpcDomainIdCount.newBuilder(); + rspBuilder.setIdCount(sum); + responseObserver.onNext(rspBuilder.build()); + responseObserver.onCompleted(); + } + + @Override + public void countLinksToDomain(nu.marginalia.index.api.RpcDomainId request, + io.grpc.stub.StreamObserver responseObserver) { + + int sum = stubPool.callEachSequential(stub -> stub.countLinksToDomain(request)) + .mapToInt(RpcDomainIdCount::getIdCount) + .sum(); + + var rspBuilder = RpcDomainIdCount.newBuilder(); + rspBuilder.setIdCount(sum); + responseObserver.onNext(rspBuilder.build()); + responseObserver.onCompleted(); + } + +} diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCService.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCService.java index 9e14ef15..5f59ee15 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCService.java +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGRPCService.java @@ -2,7 +2,6 @@ package nu.marginalia.query; import com.google.inject.Inject; import io.grpc.ManagedChannel; -import io.grpc.ManagedChannelBuilder; import io.prometheus.client.Histogram; import lombok.SneakyThrows; import nu.marginalia.db.DomainBlacklist; @@ -10,7 +9,6 @@ import nu.marginalia.index.api.*; import nu.marginalia.model.id.UrlIdCodec; import nu.marginalia.query.svc.NodeConfigurationWatcher; import nu.marginalia.query.svc.QueryFactory; -import org.jetbrains.annotations.NotNull; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,32 +26,7 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase { .help("QS-side query time (GRPC endpoint)") .register(); - private final Map channels - = new ConcurrentHashMap<>(); - private final Map actorRpcApis - = new ConcurrentHashMap<>(); - - private ManagedChannel getChannel(ServiceAndNode serviceAndNode) { - return channels.computeIfAbsent(serviceAndNode, - san -> ManagedChannelBuilder - .forAddress(serviceAndNode.getHostName(), 81) - .usePlaintext() - .build()); - } - - public IndexApiGrpc.IndexApiBlockingStub indexApi(int node) { - return actorRpcApis.computeIfAbsent(new ServiceAndNode("index-service", node), n -> - IndexApiGrpc.newBlockingStub( - getChannel(n) - ) - ); - } - - record ServiceAndNode(String service, int node) { - public String getHostName() { - return service+"-"+node; - } - } + private final QueryGrpcStubPool stubPool; private final QueryFactory queryFactory; private final DomainBlacklist blacklist; @@ -64,6 +37,13 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase { this.queryFactory = queryFactory; this.blacklist = blacklist; this.nodeConfigurationWatcher = nodeConfigurationWatcher; + + stubPool = new QueryGrpcStubPool<>(nodeConfigurationWatcher) { + @Override + public IndexApiGrpc.IndexApiBlockingStub createStub(ManagedChannel channel) { + return IndexApiGrpc.newBlockingStub(channel); + } + }; } public void query(nu.marginalia.index.api.RpcQsQuery request, @@ -89,7 +69,6 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase { responseBuilder.setDomain(query.domain); responseObserver.onNext(responseBuilder.build()); - responseObserver.onCompleted(); }); } catch (Exception e) { @@ -98,16 +77,13 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase { } } - private final ExecutorService es = Executors.newVirtualThreadPerTaskExecutor(); - private static final Comparator comparator = Comparator.comparing(RpcDecoratedResultItem::getRankingScore); @SneakyThrows private List executeQueries(RpcIndexQuery indexRequest, int totalSize) { - List>> tasks = createTasks(indexRequest); - - return es.invokeAll(tasks).stream() + return stubPool.invokeAll(stub -> new QueryTask(stub, indexRequest)) + .stream() .filter(f -> f.state() == Future.State.SUCCESS) .map(Future::resultNow) .flatMap(List::stream) @@ -116,26 +92,30 @@ public class QueryGRPCService extends QueryApiGrpc.QueryApiImplBase { .toList(); } - @NotNull - private List>> createTasks(RpcIndexQuery indexRequest) { - List>> tasks = new ArrayList<>(); + private class QueryTask implements Callable> { + private final IndexApiGrpc.IndexApiBlockingStub stub; + private final RpcIndexQuery indexRequest; - for (var node : nodeConfigurationWatcher.getQueryNodes()) { - tasks.add(() -> { - var responseIter = indexApi(node).query(indexRequest); - var ret = new ArrayList(); - while (responseIter.hasNext()) { - RpcDecoratedResultItem next = responseIter.next(); - if (isBlacklisted(next)) - continue; - ret.add(next); - } - return ret; - }); + public QueryTask(IndexApiGrpc.IndexApiBlockingStub stub, RpcIndexQuery indexRequest) { + this.stub = stub; + this.indexRequest = indexRequest; } - return tasks; - } + @Override + public List call() { + var rsp = stub.query(indexRequest); + List ret = new ArrayList<>(); + + while (rsp.hasNext()) { + RpcDecoratedResultItem next = rsp.next(); + if (isBlacklisted(next)) + continue; + ret.add(next); + } + + return ret; + } + } private boolean isBlacklisted(RpcDecoratedResultItem item) { return blacklist.isBlacklisted(UrlIdCodec.getDomainId(item.getRawItem().getCombinedId())); diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGrpcStubPool.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGrpcStubPool.java new file mode 100644 index 00000000..ed95b18c --- /dev/null +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryGrpcStubPool.java @@ -0,0 +1,64 @@ +package nu.marginalia.query; + +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; +import nu.marginalia.query.svc.NodeConfigurationWatcher; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.*; +import java.util.function.Function; +import java.util.stream.Stream; + +public abstract class QueryGrpcStubPool { + protected record ServiceAndNode(String service, int node) { + public String getHostName() { + return service+"-"+node; + } + } + + private final NodeConfigurationWatcher nodeConfigurationWatcher; + private final Map channels = new ConcurrentHashMap<>(); + private final Map actorRpcApis = new ConcurrentHashMap<>(); + private final ExecutorService virtualExecutorService = Executors.newVirtualThreadPerTaskExecutor(); + + QueryGrpcStubPool(NodeConfigurationWatcher nodeConfigurationWatcher) { + this.nodeConfigurationWatcher = nodeConfigurationWatcher; + } + + /** Get an API stub for the given node */ + public STUB indexApi(int node) { + var san = new ServiceAndNode("index-service", node); + return actorRpcApis.computeIfAbsent(san, n -> + createStub(channels.computeIfAbsent(san, this::createChannel)) + ); + } + + protected ManagedChannel createChannel(ServiceAndNode serviceAndNode) { + return ManagedChannelBuilder.forAddress(serviceAndNode.getHostName(), 81).usePlaintext().build(); + } + + /** Invoke a function on each node, returning a list of futures in a terminal state, as per + * ExecutorService$invokeAll */ + public List> invokeAll(Function> callF) throws InterruptedException { + List> calls = nodeConfigurationWatcher.getQueryNodes().stream() + .map(id -> callF.apply(indexApi(id))) + .toList(); + + return virtualExecutorService.invokeAll(calls); + } + + /** Invoke a function on each node, returning a stream of results */ + public Stream callEachSequential(Function call) { + return nodeConfigurationWatcher.getQueryNodes().stream() + .map(id -> call.apply(indexApi(id))); + } + + + /** Create a stub for the given channel, this is an operation + * that needs to be implemented for the particular API this + * pool is intended for + */ + public abstract STUB createStub(ManagedChannel channel); + +} diff --git a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java index d78b92bc..fc8bc8fb 100644 --- a/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java +++ b/code/services-core/query-service/src/main/java/nu/marginalia/query/QueryService.java @@ -42,6 +42,7 @@ public class QueryService extends Service { public QueryService(BaseServiceParams params, IndexClient indexClient, NodeConfigurationWatcher nodeWatcher, + QueryGRPCDomainLinksService domainLinksService, QueryGRPCService queryGRPCService, Gson gson, DomainBlacklist blacklist, @@ -55,6 +56,7 @@ public class QueryService extends Service { var grpcServer = ServerBuilder.forPort(params.configuration.port() + 1) .addService(queryGRPCService) + .addService(domainLinksService) .build(); grpcServer.start();