diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java index 8a1ccbe0..580ac3c7 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java @@ -151,22 +151,22 @@ public class CrawlerMain extends ProcessMainClass { System.exit(0); } - public void run(CrawlSpecProvider specProvider, Path outputDir) throws InterruptedException, IOException { + public void run(CrawlSpecProvider specProvider, Path outputDir) throws Exception { heartbeat.start(); + // First a validation run to ensure the file is all good to parse + totalTasks = specProvider.totalCount(); + if (totalTasks == 0) { + // This is an error state, and we should make noise about it + throw new IllegalStateException("No crawl tasks found, refusing to continue"); + } + logger.info("Queued {} crawl tasks, let's go", totalTasks); + try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log")); WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir); AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(specProvider.getDomains()) ) { - - // First a validation run to ensure the file is all good to parse - logger.info("Validating JSON"); - - totalTasks = specProvider.totalCount(); - - logger.info("Queued {} crawl tasks, let's go", totalTasks); - try (var specStream = specProvider.stream()) { specStream .takeWhile((e) -> abortMonitor.isAlive()) @@ -332,7 +332,13 @@ public class CrawlerMain extends ProcessMainClass { if (request.specStorage != null) { var specData = fileStorageService.getStorage(request.specStorage); - specProvider = new ParquetCrawlSpecProvider(CrawlSpecFileNames.resolve(specData)); + var parquetProvider = new ParquetCrawlSpecProvider(CrawlSpecFileNames.resolve(specData));; + + // Ensure the parquet domains are loaded into the database to avoid + // rare data-loss scenarios + dbCrawlSpecProvider.ensureParquetDomainsLoaded(parquetProvider); + + specProvider = parquetProvider; } else { specProvider = dbCrawlSpecProvider; diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/DbCrawlSpecProvider.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/DbCrawlSpecProvider.java index 755cec43..412c2c5f 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/DbCrawlSpecProvider.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/DbCrawlSpecProvider.java @@ -4,11 +4,12 @@ import com.google.inject.Inject; import com.zaxxer.hikari.HikariDataSource; import lombok.SneakyThrows; import nu.marginalia.ProcessConfiguration; +import nu.marginalia.db.DomainBlacklist; +import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.crawlspec.CrawlSpecRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -17,6 +18,7 @@ import java.util.stream.Stream; public class DbCrawlSpecProvider implements CrawlSpecProvider { private final HikariDataSource dataSource; private final ProcessConfiguration processConfiguration; + private final DomainBlacklist blacklist; private List domains; private static final Logger logger = LoggerFactory.getLogger(DbCrawlSpecProvider.class); @@ -27,21 +29,25 @@ public class DbCrawlSpecProvider implements CrawlSpecProvider { @Inject public DbCrawlSpecProvider(HikariDataSource dataSource, - ProcessConfiguration processConfiguration + ProcessConfiguration processConfiguration, + DomainBlacklist blacklist ) { this.dataSource = dataSource; this.processConfiguration = processConfiguration; + this.blacklist = blacklist; } // Load the domains into memory to ensure the crawler is resilient to database blips - private List loadData() throws SQLException { + private List loadData() throws Exception { var domains = new ArrayList(); logger.info("Loading domains to be crawled"); + blacklist.waitUntilLoaded(); + try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT DOMAIN_NAME, COALESCE(KNOWN_URLS, 0) + SELECT DOMAIN_NAME, COALESCE(KNOWN_URLS, 0), EC_DOMAIN.ID FROM EC_DOMAIN LEFT JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID WHERE NODE_AFFINITY=? @@ -51,11 +57,17 @@ public class DbCrawlSpecProvider implements CrawlSpecProvider { query.setFetchSize(10_000); var rs = query.executeQuery(); while (rs.next()) { - domains.add(new CrawlSpecRecord( - rs.getString(1), - Math.clamp((int) (URL_GROWTH_FACTOR * rs.getInt(2)), MIN_URLS_PER_DOMAIN, MAX_URLS_PER_DOMAIN), - List.of() - )); + // Skip blacklisted domains + if (blacklist.isBlacklisted(rs.getInt(3))) + continue; + + var record = new CrawlSpecRecord( + rs.getString(1), + Math.clamp((int) (URL_GROWTH_FACTOR * rs.getInt(2)), MIN_URLS_PER_DOMAIN, MAX_URLS_PER_DOMAIN), + List.of() + ); + + domains.add(record); } } @@ -69,9 +81,41 @@ public class DbCrawlSpecProvider implements CrawlSpecProvider { return domains; } + /** Ensure that the domains in the parquet spec provider are loaded into + * the database. This mitigates the risk of certain footguns, such + * re-crawling before performing the 'Load' operation, which would + * otherwise result in the crawler not being able to find the domain + * in the database through the DbCrawlSpecProvider, and thus not + * being able to crawl it. + * */ + public void ensureParquetDomainsLoaded(ParquetCrawlSpecProvider parquetCrawlSpecProvider) throws Exception { + + // This is a bit of an unhealthy mix of concerns, but it's for the Greater Good (TM) + + try (var conn = dataSource.getConnection(); + var stmt = conn.prepareStatement(""" + INSERT IGNORE INTO EC_DOMAIN(DOMAIN_NAME, DOMAIN_TOP, NODE_AFFINITY) + VALUES (?, ?, ?) + """)) + { + parquetCrawlSpecProvider.stream().forEach(record -> { + try { + var domain = new EdgeDomain(record.getDomain()); + stmt.setString(1, record.domain); + stmt.setString(2, domain.topDomain); + stmt.setInt(3, processConfiguration.node()); + stmt.addBatch(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + stmt.executeBatch(); + } + } @Override - public int totalCount() throws SQLException { + public int totalCount() throws Exception { if (domains == null) { domains = loadData(); } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java index 4a309a8d..2b748ced 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java @@ -20,6 +20,7 @@ import nu.marginalia.mqapi.crawling.CrawlRequest; import nu.marginalia.svc.DomainListRefreshService; import java.nio.file.Files; +import java.nio.file.Path; @Singleton public class RecrawlActor extends RecordActorPrototype { @@ -49,7 +50,12 @@ public class RecrawlActor extends RecordActorPrototype { if (crawlStorage == null) yield new Error("Bad storage id"); if (crawlStorage.type() != FileStorageType.CRAWL_DATA) yield new Error("Bad storage type " + crawlStorage.type()); - Files.deleteIfExists(crawlStorage.asPath().resolve("crawler.log")); + Path crawlLogPath = crawlStorage.asPath().resolve("crawler.log"); + if (Files.exists(crawlLogPath)) { + // Save the old crawl log + Path crawlLogBackup = crawlStorage.asPath().resolve("crawler.log-" + System.currentTimeMillis()); + Files.move(crawlLogPath, crawlLogBackup); + } refreshService.synchronizeDomainList();