From c73e43f5c9518716f0d820e49a41b4355b924c1c Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Sun, 18 Feb 2024 09:21:39 +0100 Subject: [PATCH] (recrawl) Mitigate recrawl-before-load footgun In the scenario where an operator * Performs a new crawl from spec * Doesn't load the data into the index * Recrawls the data The recrawl will not find the domains in the database, and the crawl log will be overwritten with an empty file, irrecoverably losing the crawl log making it impossible to load! To mitigate the impact similar problems, the change saves a backup of the old crawl log, as well as complains about this happening. More specifically to this exact scenario however, the parquet-loaded domains are also preemptively inserted into the domain database at the start of the crawl. This should help the DbCrawlSpecProvider to find them regardless of loaded state. This may seem a bit redundant, but losing crawl data is arguably the worst type of disaster scenario for this software, so it's arguably merited. --- .../java/nu/marginalia/crawl/CrawlerMain.java | 26 +++++--- .../crawl/spec/DbCrawlSpecProvider.java | 64 ++++++++++++++++--- .../marginalia/actor/task/RecrawlActor.java | 8 ++- 3 files changed, 77 insertions(+), 21 deletions(-) diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java index 8a1ccbe0..580ac3c7 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/CrawlerMain.java @@ -151,22 +151,22 @@ public class CrawlerMain extends ProcessMainClass { System.exit(0); } - public void run(CrawlSpecProvider specProvider, Path outputDir) throws InterruptedException, IOException { + public void run(CrawlSpecProvider specProvider, Path outputDir) throws Exception { heartbeat.start(); + // First a validation run to ensure the file is all good to parse + totalTasks = specProvider.totalCount(); + if (totalTasks == 0) { + // This is an error state, and we should make noise about it + throw new IllegalStateException("No crawl tasks found, refusing to continue"); + } + logger.info("Queued {} crawl tasks, let's go", totalTasks); + try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log")); WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir); AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(specProvider.getDomains()) ) { - - // First a validation run to ensure the file is all good to parse - logger.info("Validating JSON"); - - totalTasks = specProvider.totalCount(); - - logger.info("Queued {} crawl tasks, let's go", totalTasks); - try (var specStream = specProvider.stream()) { specStream .takeWhile((e) -> abortMonitor.isAlive()) @@ -332,7 +332,13 @@ public class CrawlerMain extends ProcessMainClass { if (request.specStorage != null) { var specData = fileStorageService.getStorage(request.specStorage); - specProvider = new ParquetCrawlSpecProvider(CrawlSpecFileNames.resolve(specData)); + var parquetProvider = new ParquetCrawlSpecProvider(CrawlSpecFileNames.resolve(specData));; + + // Ensure the parquet domains are loaded into the database to avoid + // rare data-loss scenarios + dbCrawlSpecProvider.ensureParquetDomainsLoaded(parquetProvider); + + specProvider = parquetProvider; } else { specProvider = dbCrawlSpecProvider; diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/DbCrawlSpecProvider.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/DbCrawlSpecProvider.java index 755cec43..412c2c5f 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/DbCrawlSpecProvider.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/spec/DbCrawlSpecProvider.java @@ -4,11 +4,12 @@ import com.google.inject.Inject; import com.zaxxer.hikari.HikariDataSource; import lombok.SneakyThrows; import nu.marginalia.ProcessConfiguration; +import nu.marginalia.db.DomainBlacklist; +import nu.marginalia.model.EdgeDomain; import nu.marginalia.model.crawlspec.CrawlSpecRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -17,6 +18,7 @@ import java.util.stream.Stream; public class DbCrawlSpecProvider implements CrawlSpecProvider { private final HikariDataSource dataSource; private final ProcessConfiguration processConfiguration; + private final DomainBlacklist blacklist; private List domains; private static final Logger logger = LoggerFactory.getLogger(DbCrawlSpecProvider.class); @@ -27,21 +29,25 @@ public class DbCrawlSpecProvider implements CrawlSpecProvider { @Inject public DbCrawlSpecProvider(HikariDataSource dataSource, - ProcessConfiguration processConfiguration + ProcessConfiguration processConfiguration, + DomainBlacklist blacklist ) { this.dataSource = dataSource; this.processConfiguration = processConfiguration; + this.blacklist = blacklist; } // Load the domains into memory to ensure the crawler is resilient to database blips - private List loadData() throws SQLException { + private List loadData() throws Exception { var domains = new ArrayList(); logger.info("Loading domains to be crawled"); + blacklist.waitUntilLoaded(); + try (var conn = dataSource.getConnection(); var query = conn.prepareStatement(""" - SELECT DOMAIN_NAME, COALESCE(KNOWN_URLS, 0) + SELECT DOMAIN_NAME, COALESCE(KNOWN_URLS, 0), EC_DOMAIN.ID FROM EC_DOMAIN LEFT JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID WHERE NODE_AFFINITY=? @@ -51,11 +57,17 @@ public class DbCrawlSpecProvider implements CrawlSpecProvider { query.setFetchSize(10_000); var rs = query.executeQuery(); while (rs.next()) { - domains.add(new CrawlSpecRecord( - rs.getString(1), - Math.clamp((int) (URL_GROWTH_FACTOR * rs.getInt(2)), MIN_URLS_PER_DOMAIN, MAX_URLS_PER_DOMAIN), - List.of() - )); + // Skip blacklisted domains + if (blacklist.isBlacklisted(rs.getInt(3))) + continue; + + var record = new CrawlSpecRecord( + rs.getString(1), + Math.clamp((int) (URL_GROWTH_FACTOR * rs.getInt(2)), MIN_URLS_PER_DOMAIN, MAX_URLS_PER_DOMAIN), + List.of() + ); + + domains.add(record); } } @@ -69,9 +81,41 @@ public class DbCrawlSpecProvider implements CrawlSpecProvider { return domains; } + /** Ensure that the domains in the parquet spec provider are loaded into + * the database. This mitigates the risk of certain footguns, such + * re-crawling before performing the 'Load' operation, which would + * otherwise result in the crawler not being able to find the domain + * in the database through the DbCrawlSpecProvider, and thus not + * being able to crawl it. + * */ + public void ensureParquetDomainsLoaded(ParquetCrawlSpecProvider parquetCrawlSpecProvider) throws Exception { + + // This is a bit of an unhealthy mix of concerns, but it's for the Greater Good (TM) + + try (var conn = dataSource.getConnection(); + var stmt = conn.prepareStatement(""" + INSERT IGNORE INTO EC_DOMAIN(DOMAIN_NAME, DOMAIN_TOP, NODE_AFFINITY) + VALUES (?, ?, ?) + """)) + { + parquetCrawlSpecProvider.stream().forEach(record -> { + try { + var domain = new EdgeDomain(record.getDomain()); + stmt.setString(1, record.domain); + stmt.setString(2, domain.topDomain); + stmt.setInt(3, processConfiguration.node()); + stmt.addBatch(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + + stmt.executeBatch(); + } + } @Override - public int totalCount() throws SQLException { + public int totalCount() throws Exception { if (domains == null) { domains = loadData(); } diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java index 4a309a8d..2b748ced 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/RecrawlActor.java @@ -20,6 +20,7 @@ import nu.marginalia.mqapi.crawling.CrawlRequest; import nu.marginalia.svc.DomainListRefreshService; import java.nio.file.Files; +import java.nio.file.Path; @Singleton public class RecrawlActor extends RecordActorPrototype { @@ -49,7 +50,12 @@ public class RecrawlActor extends RecordActorPrototype { if (crawlStorage == null) yield new Error("Bad storage id"); if (crawlStorage.type() != FileStorageType.CRAWL_DATA) yield new Error("Bad storage type " + crawlStorage.type()); - Files.deleteIfExists(crawlStorage.asPath().resolve("crawler.log")); + Path crawlLogPath = crawlStorage.asPath().resolve("crawler.log"); + if (Files.exists(crawlLogPath)) { + // Save the old crawl log + Path crawlLogBackup = crawlStorage.asPath().resolve("crawler.log-" + System.currentTimeMillis()); + Files.move(crawlLogPath, crawlLogBackup); + } refreshService.synchronizeDomainList();