(executor) Executor service now pulls DomainType list for CRAWL on "recrawl"
This is an automatic integration with the submit-site repo on github and also crawl-queue.
This commit is contained in:
parent
c0930ead0f
commit
23526f6d1a
@ -163,6 +163,14 @@ public class DomainTypes {
|
||||
}
|
||||
}
|
||||
|
||||
public List<String> downloadList(Type type) throws IOException {
|
||||
var url = getUrlForSelection(type);
|
||||
if (url.isBlank())
|
||||
return List.of();
|
||||
return downloadDomainsList(url);
|
||||
}
|
||||
|
||||
|
||||
private List<String> downloadDomainsList(String source) throws IOException {
|
||||
if (source.isBlank())
|
||||
return List.of();
|
||||
|
@ -58,5 +58,10 @@ dependencies {
|
||||
testImplementation libs.bundles.junit
|
||||
testImplementation libs.mockito
|
||||
|
||||
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
|
||||
testImplementation 'org.testcontainers:mariadb:1.17.4'
|
||||
testImplementation 'org.testcontainers:junit-jupiter:1.17.4'
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
@ -18,6 +18,7 @@ import nu.marginalia.storage.model.FileStorageType;
|
||||
import nu.marginalia.mq.MqMessageState;
|
||||
import nu.marginalia.mq.outbox.MqOutbox;
|
||||
import nu.marginalia.mqapi.crawling.CrawlRequest;
|
||||
import nu.marginalia.svc.DomainListRefreshService;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.nio.file.Files;
|
||||
@ -33,6 +34,7 @@ public class RecrawlActor extends AbstractActorPrototype {
|
||||
public static final String END = "END";
|
||||
private final MqOutbox mqCrawlerOutbox;
|
||||
private final FileStorageService storageService;
|
||||
private final DomainListRefreshService refreshService;
|
||||
private final Gson gson;
|
||||
private final ActorProcessWatcher processWatcher;
|
||||
|
||||
@ -62,6 +64,7 @@ public class RecrawlActor extends AbstractActorPrototype {
|
||||
ActorProcessWatcher processWatcher,
|
||||
ProcessOutboxes processOutboxes,
|
||||
FileStorageService storageService,
|
||||
DomainListRefreshService refreshService,
|
||||
Gson gson
|
||||
)
|
||||
{
|
||||
@ -69,6 +72,7 @@ public class RecrawlActor extends AbstractActorPrototype {
|
||||
this.processWatcher = processWatcher;
|
||||
this.mqCrawlerOutbox = processOutboxes.getCrawlerOutbox();
|
||||
this.storageService = storageService;
|
||||
this.refreshService = refreshService;
|
||||
this.gson = gson;
|
||||
}
|
||||
|
||||
@ -89,6 +93,8 @@ public class RecrawlActor extends AbstractActorPrototype {
|
||||
|
||||
Files.deleteIfExists(crawlStorage.asPath().resolve("crawler.log"));
|
||||
|
||||
refreshService.synchronizeDomainList();
|
||||
|
||||
return recrawlMessage;
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,105 @@
|
||||
package nu.marginalia.svc;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.db.DomainTypes;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.SQLException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class DomainListRefreshService {
|
||||
|
||||
private final HikariDataSource dataSource;
|
||||
private final DomainTypes domainTypes;
|
||||
private final int nodeId;
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DomainListRefreshService.class);
|
||||
|
||||
@Inject
|
||||
public DomainListRefreshService(HikariDataSource dataSource,
|
||||
DomainTypes domainTypes,
|
||||
ServiceConfiguration serviceConfiguration
|
||||
) {
|
||||
this.dataSource = dataSource;
|
||||
this.domainTypes = domainTypes;
|
||||
this.nodeId = serviceConfiguration.node();
|
||||
}
|
||||
|
||||
/** Downloads URLs from the file specified in DomainType.CRAWL and inserts them
|
||||
* into the domain table, assigning them to the current partition
|
||||
*/
|
||||
public void synchronizeDomainList() {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var insert = conn.prepareStatement("""
|
||||
INSERT IGNORE INTO EC_DOMAIN(DOMAIN_NAME, DOMAIN_TOP, NODE_AFFINITY)
|
||||
VALUES (?, ?, ?)
|
||||
""");
|
||||
var update = conn.prepareStatement("""
|
||||
UPDATE EC_DOMAIN SET NODE_AFFINITY=? WHERE DOMAIN_NAME=? AND NODE_AFFINITY <= 0
|
||||
""")
|
||||
|
||||
){
|
||||
List<String> domainsAll = new ArrayList<>();
|
||||
domainsAll.addAll(getCrawlQueue(conn));
|
||||
domainsAll.addAll(domainTypes.downloadList(DomainTypes.Type.CRAWL));
|
||||
|
||||
// Case 1: The domains are in the table, but have no affinity defined
|
||||
for (var domain : domainsAll) {
|
||||
update.setString(1, domain.toLowerCase());
|
||||
update.setInt(2, nodeId);
|
||||
update.addBatch();
|
||||
}
|
||||
update.executeBatch();
|
||||
|
||||
|
||||
// Case 2: The domains are missing form the table
|
||||
for (var domain : domainsAll) {
|
||||
var parsed = new EdgeDomain(domain);
|
||||
insert.setString(1, domain.toLowerCase());
|
||||
insert.setString(2, parsed.domain);
|
||||
insert.setInt(3, nodeId);
|
||||
insert.addBatch();
|
||||
}
|
||||
insert.executeBatch();
|
||||
|
||||
cleanCrawlQueue(conn);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.warn("Failed to insert domains", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private List<String> getCrawlQueue(Connection connection) {
|
||||
List<String> ret = new ArrayList<>();
|
||||
try (var q = connection.prepareStatement("SELECT DOMAIN_NAME FROM CRAWL_QUEUE")) {
|
||||
var rs = q.executeQuery();
|
||||
while (rs.next()) {
|
||||
ret.add(rs.getString(1));
|
||||
}
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.warn("Failed to fetch from crawl queue", ex);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
private void cleanCrawlQueue(Connection connection) {
|
||||
try (var stmt = connection.createStatement()) {
|
||||
stmt.executeUpdate("""
|
||||
DELETE CRAWL_QUEUE
|
||||
FROM CRAWL_QUEUE INNER JOIN EC_DOMAIN ON CRAWL_QUEUE.DOMAIN_NAME=EC_DOMAIN.DOMAIN_NAME
|
||||
WHERE NODE_AFFINITY>0
|
||||
""");
|
||||
} catch (SQLException e) {
|
||||
logger.warn("Failed to clean up crawl queue", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,107 @@
|
||||
package nu.marginalia.svc;
|
||||
|
||||
import com.zaxxer.hikari.HikariConfig;
|
||||
import com.zaxxer.hikari.HikariDataSource;
|
||||
import nu.marginalia.db.DomainTypes;
|
||||
import nu.marginalia.service.module.ServiceConfiguration;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Tag;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.parallel.Execution;
|
||||
import org.junit.jupiter.api.parallel.ExecutionMode;
|
||||
import org.testcontainers.containers.MariaDBContainer;
|
||||
import org.testcontainers.junit.jupiter.Container;
|
||||
import org.testcontainers.junit.jupiter.Testcontainers;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.sql.SQLException;
|
||||
import java.util.*;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
@Testcontainers
|
||||
@Execution(ExecutionMode.SAME_THREAD)
|
||||
@Tag("slow")
|
||||
class DomainListRefreshServiceTest {
|
||||
@Container
|
||||
static MariaDBContainer<?> mariaDBContainer = new MariaDBContainer<>("mariadb")
|
||||
.withDatabaseName("WMSA_prod")
|
||||
.withUsername("wmsa")
|
||||
.withPassword("wmsa")
|
||||
.withInitScript("db/migration/V23_06_0_000__base.sql")
|
||||
.withNetworkAliases("mariadb");
|
||||
|
||||
static HikariDataSource dataSource;
|
||||
|
||||
@BeforeAll
|
||||
public static void setup() {
|
||||
HikariConfig config = new HikariConfig();
|
||||
config.setJdbcUrl(mariaDBContainer.getJdbcUrl());
|
||||
config.setUsername("wmsa");
|
||||
config.setPassword("wmsa");
|
||||
|
||||
dataSource = new HikariDataSource(config);
|
||||
|
||||
// apply migrations
|
||||
|
||||
List<String> migrations = List.of(
|
||||
"db/migration/V23_06_0_003__crawl-queue.sql",
|
||||
"db/migration/V23_07_0_001__domain_type.sql",
|
||||
"db/migration/V23_11_0_007__domain_node_affinity.sql"
|
||||
);
|
||||
for (String migration : migrations) {
|
||||
try (var resource = Objects.requireNonNull(ClassLoader.getSystemResourceAsStream(migration),
|
||||
"Could not load migration script " + migration);
|
||||
var conn = dataSource.getConnection();
|
||||
var stmt = conn.createStatement()
|
||||
) {
|
||||
String script = new String(resource.readAllBytes());
|
||||
String[] cmds = script.split("\\s*;\\s*");
|
||||
for (String cmd : cmds) {
|
||||
if (cmd.isBlank())
|
||||
continue;
|
||||
System.out.println(cmd);
|
||||
stmt.executeUpdate(cmd);
|
||||
}
|
||||
} catch (IOException | SQLException ex) {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void shutDown() {
|
||||
dataSource.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
void downloadDomainsList() throws SQLException {
|
||||
DomainTypes domainTypes = new DomainTypes(dataSource);
|
||||
DomainListRefreshService service = new DomainListRefreshService(dataSource,
|
||||
domainTypes, new ServiceConfiguration(null, 1, null, -1, -1, null));
|
||||
|
||||
domainTypes.updateUrlForSelection(DomainTypes.Type.CRAWL, "https://downloads.marginalia.nu/domain-list-test.txt");
|
||||
service.synchronizeDomainList();
|
||||
|
||||
Map<String, Integer> result = new HashMap<>();
|
||||
try (var conn = dataSource.getConnection();
|
||||
var qs = conn.prepareStatement("""
|
||||
SELECT DOMAIN_NAME, NODE_AFFINITY FROM EC_DOMAIN
|
||||
"""))
|
||||
{
|
||||
var rs = qs.executeQuery();
|
||||
while (rs.next()) {
|
||||
result.put(rs.getString(1), rs.getInt(2));
|
||||
}
|
||||
}
|
||||
assertEquals(
|
||||
Map.of(
|
||||
"memex.marginalia.nu", 1,
|
||||
"encyclopedia.marginalia.nu", 1,
|
||||
"search.marginalia.nu", 1,
|
||||
"www.marginalia.nu", 1
|
||||
),
|
||||
result);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user