diff --git a/code/common/db/src/main/java/nu/marginalia/db/DomainTypes.java b/code/common/db/src/main/java/nu/marginalia/db/DomainTypes.java index 021f1c3b..94d3ca6e 100644 --- a/code/common/db/src/main/java/nu/marginalia/db/DomainTypes.java +++ b/code/common/db/src/main/java/nu/marginalia/db/DomainTypes.java @@ -163,6 +163,14 @@ public class DomainTypes { } } + public List downloadList(Type type) throws IOException { + var url = getUrlForSelection(type); + if (url.isBlank()) + return List.of(); + return downloadDomainsList(url); + } + + private List downloadDomainsList(String source) throws IOException { if (source.isBlank()) return List.of(); diff --git a/code/services-core/executor-service/build.gradle b/code/services-core/executor-service/build.gradle index d95b5070..a899a32e 100644 --- a/code/services-core/executor-service/build.gradle +++ b/code/services-core/executor-service/build.gradle @@ -58,5 +58,10 @@ dependencies { testImplementation libs.bundles.junit testImplementation libs.mockito + testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4') + testImplementation 'org.testcontainers:mariadb:1.17.4' + testImplementation 'org.testcontainers:junit-jupiter:1.17.4' + + } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java index 72140509..96962481 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java @@ -18,6 +18,7 @@ import nu.marginalia.storage.model.FileStorageType; import nu.marginalia.mq.MqMessageState; import nu.marginalia.mq.outbox.MqOutbox; import nu.marginalia.mqapi.crawling.CrawlRequest; +import nu.marginalia.svc.DomainListRefreshService; import org.jetbrains.annotations.NotNull; import java.nio.file.Files; @@ -33,6 +34,7 @@ public class RecrawlActor extends AbstractActorPrototype { public static final String END = "END"; private final MqOutbox mqCrawlerOutbox; private final FileStorageService storageService; + private final DomainListRefreshService refreshService; private final Gson gson; private final ActorProcessWatcher processWatcher; @@ -62,6 +64,7 @@ public class RecrawlActor extends AbstractActorPrototype { ActorProcessWatcher processWatcher, ProcessOutboxes processOutboxes, FileStorageService storageService, + DomainListRefreshService refreshService, Gson gson ) { @@ -69,6 +72,7 @@ public class RecrawlActor extends AbstractActorPrototype { this.processWatcher = processWatcher; this.mqCrawlerOutbox = processOutboxes.getCrawlerOutbox(); this.storageService = storageService; + this.refreshService = refreshService; this.gson = gson; } @@ -89,6 +93,8 @@ public class RecrawlActor extends AbstractActorPrototype { Files.deleteIfExists(crawlStorage.asPath().resolve("crawler.log")); + refreshService.synchronizeDomainList(); + return recrawlMessage; } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/svc/DomainListRefreshService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/svc/DomainListRefreshService.java new file mode 100644 index 00000000..2543f963 --- /dev/null +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/svc/DomainListRefreshService.java @@ -0,0 +1,105 @@ +package nu.marginalia.svc; + +import com.google.inject.Inject; +import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.db.DomainTypes; +import nu.marginalia.model.EdgeDomain; +import nu.marginalia.service.module.ServiceConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +public class DomainListRefreshService { + + private final HikariDataSource dataSource; + private final DomainTypes domainTypes; + private final int nodeId; + + private static final Logger logger = LoggerFactory.getLogger(DomainListRefreshService.class); + + @Inject + public DomainListRefreshService(HikariDataSource dataSource, + DomainTypes domainTypes, + ServiceConfiguration serviceConfiguration + ) { + this.dataSource = dataSource; + this.domainTypes = domainTypes; + this.nodeId = serviceConfiguration.node(); + } + + /** Downloads URLs from the file specified in DomainType.CRAWL and inserts them + * into the domain table, assigning them to the current partition + */ + public void synchronizeDomainList() { + try (var conn = dataSource.getConnection(); + var insert = conn.prepareStatement(""" + INSERT IGNORE INTO EC_DOMAIN(DOMAIN_NAME, DOMAIN_TOP, NODE_AFFINITY) + VALUES (?, ?, ?) + """); + var update = conn.prepareStatement(""" + UPDATE EC_DOMAIN SET NODE_AFFINITY=? WHERE DOMAIN_NAME=? AND NODE_AFFINITY <= 0 + """) + + ){ + List domainsAll = new ArrayList<>(); + domainsAll.addAll(getCrawlQueue(conn)); + domainsAll.addAll(domainTypes.downloadList(DomainTypes.Type.CRAWL)); + + // Case 1: The domains are in the table, but have no affinity defined + for (var domain : domainsAll) { + update.setString(1, domain.toLowerCase()); + update.setInt(2, nodeId); + update.addBatch(); + } + update.executeBatch(); + + + // Case 2: The domains are missing form the table + for (var domain : domainsAll) { + var parsed = new EdgeDomain(domain); + insert.setString(1, domain.toLowerCase()); + insert.setString(2, parsed.domain); + insert.setInt(3, nodeId); + insert.addBatch(); + } + insert.executeBatch(); + + cleanCrawlQueue(conn); + } + catch (Exception ex) { + logger.warn("Failed to insert domains", ex); + } + } + + private List getCrawlQueue(Connection connection) { + List ret = new ArrayList<>(); + try (var q = connection.prepareStatement("SELECT DOMAIN_NAME FROM CRAWL_QUEUE")) { + var rs = q.executeQuery(); + while (rs.next()) { + ret.add(rs.getString(1)); + } + } + catch (Exception ex) { + logger.warn("Failed to fetch from crawl queue", ex); + } + + return ret; + } + + private void cleanCrawlQueue(Connection connection) { + try (var stmt = connection.createStatement()) { + stmt.executeUpdate(""" + DELETE CRAWL_QUEUE + FROM CRAWL_QUEUE INNER JOIN EC_DOMAIN ON CRAWL_QUEUE.DOMAIN_NAME=EC_DOMAIN.DOMAIN_NAME + WHERE NODE_AFFINITY>0 + """); + } catch (SQLException e) { + logger.warn("Failed to clean up crawl queue", e); + } + } + +} diff --git a/code/services-core/executor-service/src/test/java/nu/marginalia/svc/DomainListRefreshServiceTest.java b/code/services-core/executor-service/src/test/java/nu/marginalia/svc/DomainListRefreshServiceTest.java new file mode 100644 index 00000000..cdf52c3a --- /dev/null +++ b/code/services-core/executor-service/src/test/java/nu/marginalia/svc/DomainListRefreshServiceTest.java @@ -0,0 +1,107 @@ +package nu.marginalia.svc; + +import com.zaxxer.hikari.HikariConfig; +import com.zaxxer.hikari.HikariDataSource; +import nu.marginalia.db.DomainTypes; +import nu.marginalia.service.module.ServiceConfiguration; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.testcontainers.containers.MariaDBContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.*; + +import static org.junit.jupiter.api.Assertions.*; + +@Testcontainers +@Execution(ExecutionMode.SAME_THREAD) +@Tag("slow") +class DomainListRefreshServiceTest { + @Container + static MariaDBContainer mariaDBContainer = new MariaDBContainer<>("mariadb") + .withDatabaseName("WMSA_prod") + .withUsername("wmsa") + .withPassword("wmsa") + .withInitScript("db/migration/V23_06_0_000__base.sql") + .withNetworkAliases("mariadb"); + + static HikariDataSource dataSource; + + @BeforeAll + public static void setup() { + HikariConfig config = new HikariConfig(); + config.setJdbcUrl(mariaDBContainer.getJdbcUrl()); + config.setUsername("wmsa"); + config.setPassword("wmsa"); + + dataSource = new HikariDataSource(config); + + // apply migrations + + List migrations = List.of( + "db/migration/V23_06_0_003__crawl-queue.sql", + "db/migration/V23_07_0_001__domain_type.sql", + "db/migration/V23_11_0_007__domain_node_affinity.sql" + ); + for (String migration : migrations) { + try (var resource = Objects.requireNonNull(ClassLoader.getSystemResourceAsStream(migration), + "Could not load migration script " + migration); + var conn = dataSource.getConnection(); + var stmt = conn.createStatement() + ) { + String script = new String(resource.readAllBytes()); + String[] cmds = script.split("\\s*;\\s*"); + for (String cmd : cmds) { + if (cmd.isBlank()) + continue; + System.out.println(cmd); + stmt.executeUpdate(cmd); + } + } catch (IOException | SQLException ex) { + + } + } + } + + @AfterAll + public static void shutDown() { + dataSource.close(); + } + + @Test + void downloadDomainsList() throws SQLException { + DomainTypes domainTypes = new DomainTypes(dataSource); + DomainListRefreshService service = new DomainListRefreshService(dataSource, + domainTypes, new ServiceConfiguration(null, 1, null, -1, -1, null)); + + domainTypes.updateUrlForSelection(DomainTypes.Type.CRAWL, "https://downloads.marginalia.nu/domain-list-test.txt"); + service.synchronizeDomainList(); + + Map result = new HashMap<>(); + try (var conn = dataSource.getConnection(); + var qs = conn.prepareStatement(""" + SELECT DOMAIN_NAME, NODE_AFFINITY FROM EC_DOMAIN + """)) + { + var rs = qs.executeQuery(); + while (rs.next()) { + result.put(rs.getString(1), rs.getInt(2)); + } + } + assertEquals( + Map.of( + "memex.marginalia.nu", 1, + "encyclopedia.marginalia.nu", 1, + "search.marginalia.nu", 1, + "www.marginalia.nu", 1 + ), + result); + } +} \ No newline at end of file