(recrawl) Mitigate recrawl-before-load footgun
In the scenario where an operator * Performs a new crawl from spec * Doesn't load the data into the index * Recrawls the data The recrawl will not find the domains in the database, and the crawl log will be overwritten with an empty file, irrecoverably losing the crawl log making it impossible to load! To mitigate the impact similar problems, the change saves a backup of the old crawl log, as well as complains about this happening. More specifically to this exact scenario however, the parquet-loaded domains are also preemptively inserted into the domain database at the start of the crawl. This should help the DbCrawlSpecProvider to find them regardless of loaded state. This may seem a bit redundant, but losing crawl data is arguably the worst type of disaster scenario for this software, so it's arguably merited.
This commit is contained in:
parent
e61e7f44b9
commit
c73e43f5c9
@ -151,22 +151,22 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
System.exit(0);
|
System.exit(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void run(CrawlSpecProvider specProvider, Path outputDir) throws InterruptedException, IOException {
|
public void run(CrawlSpecProvider specProvider, Path outputDir) throws Exception {
|
||||||
|
|
||||||
heartbeat.start();
|
heartbeat.start();
|
||||||
|
|
||||||
|
// First a validation run to ensure the file is all good to parse
|
||||||
|
totalTasks = specProvider.totalCount();
|
||||||
|
if (totalTasks == 0) {
|
||||||
|
// This is an error state, and we should make noise about it
|
||||||
|
throw new IllegalStateException("No crawl tasks found, refusing to continue");
|
||||||
|
}
|
||||||
|
logger.info("Queued {} crawl tasks, let's go", totalTasks);
|
||||||
|
|
||||||
try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"));
|
try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"));
|
||||||
WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir);
|
WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir);
|
||||||
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(specProvider.getDomains())
|
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(specProvider.getDomains())
|
||||||
) {
|
) {
|
||||||
|
|
||||||
// First a validation run to ensure the file is all good to parse
|
|
||||||
logger.info("Validating JSON");
|
|
||||||
|
|
||||||
totalTasks = specProvider.totalCount();
|
|
||||||
|
|
||||||
logger.info("Queued {} crawl tasks, let's go", totalTasks);
|
|
||||||
|
|
||||||
try (var specStream = specProvider.stream()) {
|
try (var specStream = specProvider.stream()) {
|
||||||
specStream
|
specStream
|
||||||
.takeWhile((e) -> abortMonitor.isAlive())
|
.takeWhile((e) -> abortMonitor.isAlive())
|
||||||
@ -332,7 +332,13 @@ public class CrawlerMain extends ProcessMainClass {
|
|||||||
|
|
||||||
if (request.specStorage != null) {
|
if (request.specStorage != null) {
|
||||||
var specData = fileStorageService.getStorage(request.specStorage);
|
var specData = fileStorageService.getStorage(request.specStorage);
|
||||||
specProvider = new ParquetCrawlSpecProvider(CrawlSpecFileNames.resolve(specData));
|
var parquetProvider = new ParquetCrawlSpecProvider(CrawlSpecFileNames.resolve(specData));;
|
||||||
|
|
||||||
|
// Ensure the parquet domains are loaded into the database to avoid
|
||||||
|
// rare data-loss scenarios
|
||||||
|
dbCrawlSpecProvider.ensureParquetDomainsLoaded(parquetProvider);
|
||||||
|
|
||||||
|
specProvider = parquetProvider;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
specProvider = dbCrawlSpecProvider;
|
specProvider = dbCrawlSpecProvider;
|
||||||
|
@ -4,11 +4,12 @@ import com.google.inject.Inject;
|
|||||||
import com.zaxxer.hikari.HikariDataSource;
|
import com.zaxxer.hikari.HikariDataSource;
|
||||||
import lombok.SneakyThrows;
|
import lombok.SneakyThrows;
|
||||||
import nu.marginalia.ProcessConfiguration;
|
import nu.marginalia.ProcessConfiguration;
|
||||||
|
import nu.marginalia.db.DomainBlacklist;
|
||||||
|
import nu.marginalia.model.EdgeDomain;
|
||||||
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
|
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.sql.SQLException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
@ -17,6 +18,7 @@ import java.util.stream.Stream;
|
|||||||
public class DbCrawlSpecProvider implements CrawlSpecProvider {
|
public class DbCrawlSpecProvider implements CrawlSpecProvider {
|
||||||
private final HikariDataSource dataSource;
|
private final HikariDataSource dataSource;
|
||||||
private final ProcessConfiguration processConfiguration;
|
private final ProcessConfiguration processConfiguration;
|
||||||
|
private final DomainBlacklist blacklist;
|
||||||
private List<CrawlSpecRecord> domains;
|
private List<CrawlSpecRecord> domains;
|
||||||
|
|
||||||
private static final Logger logger = LoggerFactory.getLogger(DbCrawlSpecProvider.class);
|
private static final Logger logger = LoggerFactory.getLogger(DbCrawlSpecProvider.class);
|
||||||
@ -27,21 +29,25 @@ public class DbCrawlSpecProvider implements CrawlSpecProvider {
|
|||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public DbCrawlSpecProvider(HikariDataSource dataSource,
|
public DbCrawlSpecProvider(HikariDataSource dataSource,
|
||||||
ProcessConfiguration processConfiguration
|
ProcessConfiguration processConfiguration,
|
||||||
|
DomainBlacklist blacklist
|
||||||
) {
|
) {
|
||||||
this.dataSource = dataSource;
|
this.dataSource = dataSource;
|
||||||
this.processConfiguration = processConfiguration;
|
this.processConfiguration = processConfiguration;
|
||||||
|
this.blacklist = blacklist;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load the domains into memory to ensure the crawler is resilient to database blips
|
// Load the domains into memory to ensure the crawler is resilient to database blips
|
||||||
private List<CrawlSpecRecord> loadData() throws SQLException {
|
private List<CrawlSpecRecord> loadData() throws Exception {
|
||||||
var domains = new ArrayList<CrawlSpecRecord>();
|
var domains = new ArrayList<CrawlSpecRecord>();
|
||||||
|
|
||||||
logger.info("Loading domains to be crawled");
|
logger.info("Loading domains to be crawled");
|
||||||
|
|
||||||
|
blacklist.waitUntilLoaded();
|
||||||
|
|
||||||
try (var conn = dataSource.getConnection();
|
try (var conn = dataSource.getConnection();
|
||||||
var query = conn.prepareStatement("""
|
var query = conn.prepareStatement("""
|
||||||
SELECT DOMAIN_NAME, COALESCE(KNOWN_URLS, 0)
|
SELECT DOMAIN_NAME, COALESCE(KNOWN_URLS, 0), EC_DOMAIN.ID
|
||||||
FROM EC_DOMAIN
|
FROM EC_DOMAIN
|
||||||
LEFT JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
|
LEFT JOIN DOMAIN_METADATA ON EC_DOMAIN.ID=DOMAIN_METADATA.ID
|
||||||
WHERE NODE_AFFINITY=?
|
WHERE NODE_AFFINITY=?
|
||||||
@ -51,11 +57,17 @@ public class DbCrawlSpecProvider implements CrawlSpecProvider {
|
|||||||
query.setFetchSize(10_000);
|
query.setFetchSize(10_000);
|
||||||
var rs = query.executeQuery();
|
var rs = query.executeQuery();
|
||||||
while (rs.next()) {
|
while (rs.next()) {
|
||||||
domains.add(new CrawlSpecRecord(
|
// Skip blacklisted domains
|
||||||
|
if (blacklist.isBlacklisted(rs.getInt(3)))
|
||||||
|
continue;
|
||||||
|
|
||||||
|
var record = new CrawlSpecRecord(
|
||||||
rs.getString(1),
|
rs.getString(1),
|
||||||
Math.clamp((int) (URL_GROWTH_FACTOR * rs.getInt(2)), MIN_URLS_PER_DOMAIN, MAX_URLS_PER_DOMAIN),
|
Math.clamp((int) (URL_GROWTH_FACTOR * rs.getInt(2)), MIN_URLS_PER_DOMAIN, MAX_URLS_PER_DOMAIN),
|
||||||
List.of()
|
List.of()
|
||||||
));
|
);
|
||||||
|
|
||||||
|
domains.add(record);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -69,9 +81,41 @@ public class DbCrawlSpecProvider implements CrawlSpecProvider {
|
|||||||
return domains;
|
return domains;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Ensure that the domains in the parquet spec provider are loaded into
|
||||||
|
* the database. This mitigates the risk of certain footguns, such
|
||||||
|
* re-crawling before performing the 'Load' operation, which would
|
||||||
|
* otherwise result in the crawler not being able to find the domain
|
||||||
|
* in the database through the DbCrawlSpecProvider, and thus not
|
||||||
|
* being able to crawl it.
|
||||||
|
* */
|
||||||
|
public void ensureParquetDomainsLoaded(ParquetCrawlSpecProvider parquetCrawlSpecProvider) throws Exception {
|
||||||
|
|
||||||
|
// This is a bit of an unhealthy mix of concerns, but it's for the Greater Good (TM)
|
||||||
|
|
||||||
|
try (var conn = dataSource.getConnection();
|
||||||
|
var stmt = conn.prepareStatement("""
|
||||||
|
INSERT IGNORE INTO EC_DOMAIN(DOMAIN_NAME, DOMAIN_TOP, NODE_AFFINITY)
|
||||||
|
VALUES (?, ?, ?)
|
||||||
|
"""))
|
||||||
|
{
|
||||||
|
parquetCrawlSpecProvider.stream().forEach(record -> {
|
||||||
|
try {
|
||||||
|
var domain = new EdgeDomain(record.getDomain());
|
||||||
|
stmt.setString(1, record.domain);
|
||||||
|
stmt.setString(2, domain.topDomain);
|
||||||
|
stmt.setInt(3, processConfiguration.node());
|
||||||
|
stmt.addBatch();
|
||||||
|
} catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
stmt.executeBatch();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int totalCount() throws SQLException {
|
public int totalCount() throws Exception {
|
||||||
if (domains == null) {
|
if (domains == null) {
|
||||||
domains = loadData();
|
domains = loadData();
|
||||||
}
|
}
|
||||||
|
@ -20,6 +20,7 @@ import nu.marginalia.mqapi.crawling.CrawlRequest;
|
|||||||
import nu.marginalia.svc.DomainListRefreshService;
|
import nu.marginalia.svc.DomainListRefreshService;
|
||||||
|
|
||||||
import java.nio.file.Files;
|
import java.nio.file.Files;
|
||||||
|
import java.nio.file.Path;
|
||||||
|
|
||||||
@Singleton
|
@Singleton
|
||||||
public class RecrawlActor extends RecordActorPrototype {
|
public class RecrawlActor extends RecordActorPrototype {
|
||||||
@ -49,7 +50,12 @@ public class RecrawlActor extends RecordActorPrototype {
|
|||||||
if (crawlStorage == null) yield new Error("Bad storage id");
|
if (crawlStorage == null) yield new Error("Bad storage id");
|
||||||
if (crawlStorage.type() != FileStorageType.CRAWL_DATA) yield new Error("Bad storage type " + crawlStorage.type());
|
if (crawlStorage.type() != FileStorageType.CRAWL_DATA) yield new Error("Bad storage type " + crawlStorage.type());
|
||||||
|
|
||||||
Files.deleteIfExists(crawlStorage.asPath().resolve("crawler.log"));
|
Path crawlLogPath = crawlStorage.asPath().resolve("crawler.log");
|
||||||
|
if (Files.exists(crawlLogPath)) {
|
||||||
|
// Save the old crawl log
|
||||||
|
Path crawlLogBackup = crawlStorage.asPath().resolve("crawler.log-" + System.currentTimeMillis());
|
||||||
|
Files.move(crawlLogPath, crawlLogBackup);
|
||||||
|
}
|
||||||
|
|
||||||
refreshService.synchronizeDomainList();
|
refreshService.synchronizeDomainList();
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user