(warc) Toggle for saving WARC data
Add a toggle for saving the WARC data generated by the search engine's crawler. Normally this is discarded, but for debugging or archival purposes, retaining it may be of interest. The warc files are concatenated into larger archives, up to about 1 GB each. An index is also created containing filenames, domain names, offsets and sizes to help navigate these larger archives. The warc data is saved in a directory warc/ under the crawl data storage.
This commit is contained in:
parent
264e2db539
commit
0caef1b307
@ -17,16 +17,17 @@ public class NodeConfigurationService {
|
||||
this.dataSource = dataSource;
|
||||
}
|
||||
|
||||
public NodeConfiguration create(int id, String description, boolean acceptQueries) throws SQLException {
|
||||
public NodeConfiguration create(int id, String description, boolean acceptQueries, boolean keepWarcs) throws SQLException {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var is = conn.prepareStatement("""
|
||||
INSERT IGNORE INTO NODE_CONFIGURATION(ID, DESCRIPTION, ACCEPT_QUERIES) VALUES(?, ?, ?)
|
||||
INSERT IGNORE INTO NODE_CONFIGURATION(ID, DESCRIPTION, ACCEPT_QUERIES) VALUES(?, ?, ?, ?)
|
||||
""")
|
||||
)
|
||||
{
|
||||
is.setInt(1, id);
|
||||
is.setString(2, description);
|
||||
is.setBoolean(3, acceptQueries);
|
||||
is.setBoolean(4, keepWarcs);
|
||||
|
||||
if (is.executeUpdate() <= 0) {
|
||||
throw new IllegalStateException("Failed to insert configuration");
|
||||
@ -39,7 +40,7 @@ public class NodeConfigurationService {
|
||||
public List<NodeConfiguration> getAll() throws SQLException {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var qs = conn.prepareStatement("""
|
||||
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, DISABLED
|
||||
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, DISABLED
|
||||
FROM NODE_CONFIGURATION
|
||||
""")) {
|
||||
var rs = qs.executeQuery();
|
||||
@ -53,6 +54,7 @@ public class NodeConfigurationService {
|
||||
rs.getBoolean("ACCEPT_QUERIES"),
|
||||
rs.getBoolean("AUTO_CLEAN"),
|
||||
rs.getBoolean("PRECESSION"),
|
||||
rs.getBoolean("KEEP_WARCS"),
|
||||
rs.getBoolean("DISABLED")
|
||||
));
|
||||
}
|
||||
@ -63,7 +65,7 @@ public class NodeConfigurationService {
|
||||
public NodeConfiguration get(int nodeId) throws SQLException {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var qs = conn.prepareStatement("""
|
||||
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, DISABLED
|
||||
SELECT ID, DESCRIPTION, ACCEPT_QUERIES, AUTO_CLEAN, PRECESSION, KEEP_WARCS, DISABLED
|
||||
FROM NODE_CONFIGURATION
|
||||
WHERE ID=?
|
||||
""")) {
|
||||
@ -76,6 +78,7 @@ public class NodeConfigurationService {
|
||||
rs.getBoolean("ACCEPT_QUERIES"),
|
||||
rs.getBoolean("AUTO_CLEAN"),
|
||||
rs.getBoolean("PRECESSION"),
|
||||
rs.getBoolean("KEEP_WARCS"),
|
||||
rs.getBoolean("DISABLED")
|
||||
);
|
||||
}
|
||||
@ -88,7 +91,7 @@ public class NodeConfigurationService {
|
||||
try (var conn = dataSource.getConnection();
|
||||
var us = conn.prepareStatement("""
|
||||
UPDATE NODE_CONFIGURATION
|
||||
SET DESCRIPTION=?, ACCEPT_QUERIES=?, AUTO_CLEAN=?, PRECESSION=?, DISABLED=?
|
||||
SET DESCRIPTION=?, ACCEPT_QUERIES=?, AUTO_CLEAN=?, PRECESSION=?, KEEP_WARCS=?, DISABLED=?
|
||||
WHERE ID=?
|
||||
"""))
|
||||
{
|
||||
@ -96,8 +99,9 @@ public class NodeConfigurationService {
|
||||
us.setBoolean(2, config.acceptQueries());
|
||||
us.setBoolean(3, config.autoClean());
|
||||
us.setBoolean(4, config.includeInPrecession());
|
||||
us.setBoolean(5, config.disabled());
|
||||
us.setInt(6, config.node());
|
||||
us.setBoolean(5, config.keepWarcs());
|
||||
us.setBoolean(6, config.disabled());
|
||||
us.setInt(7, config.node());
|
||||
|
||||
if (us.executeUpdate() <= 0)
|
||||
throw new IllegalStateException("Failed to update configuration");
|
||||
|
@ -5,6 +5,7 @@ public record NodeConfiguration(int node,
|
||||
boolean acceptQueries,
|
||||
boolean autoClean,
|
||||
boolean includeInPrecession,
|
||||
boolean keepWarcs,
|
||||
boolean disabled
|
||||
)
|
||||
{
|
||||
|
@ -50,8 +50,8 @@ public class NodeConfigurationServiceTest {
|
||||
|
||||
@Test
|
||||
public void test() throws SQLException {
|
||||
var a = nodeConfigurationService.create(1, "Test", false);
|
||||
var b = nodeConfigurationService.create(2, "Foo", true);
|
||||
var a = nodeConfigurationService.create(1, "Test", false, false);
|
||||
var b = nodeConfigurationService.create(2, "Foo", true, false);
|
||||
|
||||
assertEquals(1, a.node());
|
||||
assertEquals("Test", a.description());
|
||||
|
@ -0,0 +1 @@
|
||||
ALTER TABLE WMSA_prod.NODE_CONFIGURATION ADD COLUMN KEEP_WARCS BOOLEAN DEFAULT FALSE;
|
@ -57,7 +57,7 @@ public class NodeStatusWatcher {
|
||||
|
||||
private void setupNode() {
|
||||
try {
|
||||
configurationService.create(nodeId, "Node " + nodeId, nodeId == 1);
|
||||
configurationService.create(nodeId, "Node " + nodeId, nodeId == 1, false);
|
||||
fileStorageService.createStorageBase("Index Data", Path.of("/idx"), nodeId, FileStorageBaseType.CURRENT);
|
||||
fileStorageService.createStorageBase("Index Backups", Path.of("/backup"), nodeId, FileStorageBaseType.BACKUP);
|
||||
fileStorageService.createStorageBase("Crawl Data", Path.of("/storage"), nodeId, FileStorageBaseType.STORAGE);
|
||||
|
@ -17,6 +17,8 @@ import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
|
||||
import nu.marginalia.crawl.spec.CrawlSpecProvider;
|
||||
import nu.marginalia.crawl.spec.DbCrawlSpecProvider;
|
||||
import nu.marginalia.crawl.spec.ParquetCrawlSpecProvider;
|
||||
import nu.marginalia.crawl.warc.WarcArchiverFactory;
|
||||
import nu.marginalia.crawl.warc.WarcArchiverIf;
|
||||
import nu.marginalia.crawling.io.CrawledDomainReader;
|
||||
import nu.marginalia.crawling.io.CrawlerOutputFile;
|
||||
import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileWriter;
|
||||
@ -59,6 +61,7 @@ public class CrawlerMain {
|
||||
private final FileStorageService fileStorageService;
|
||||
private final DbCrawlSpecProvider dbCrawlSpecProvider;
|
||||
private final AnchorTagsSourceFactory anchorTagsSourceFactory;
|
||||
private final WarcArchiverFactory warcArchiverFactory;
|
||||
private final Gson gson;
|
||||
private final int node;
|
||||
private final SimpleBlockingThreadPool pool;
|
||||
@ -79,6 +82,7 @@ public class CrawlerMain {
|
||||
ProcessConfiguration processConfiguration,
|
||||
DbCrawlSpecProvider dbCrawlSpecProvider,
|
||||
AnchorTagsSourceFactory anchorTagsSourceFactory,
|
||||
WarcArchiverFactory warcArchiverFactory,
|
||||
Gson gson) {
|
||||
this.userAgent = userAgent;
|
||||
this.heartbeat = heartbeat;
|
||||
@ -87,6 +91,7 @@ public class CrawlerMain {
|
||||
this.fileStorageService = fileStorageService;
|
||||
this.dbCrawlSpecProvider = dbCrawlSpecProvider;
|
||||
this.anchorTagsSourceFactory = anchorTagsSourceFactory;
|
||||
this.warcArchiverFactory = warcArchiverFactory;
|
||||
this.gson = gson;
|
||||
this.node = processConfiguration.node();
|
||||
|
||||
@ -150,6 +155,7 @@ public class CrawlerMain {
|
||||
heartbeat.start();
|
||||
|
||||
try (WorkLog workLog = new WorkLog(outputDir.resolve("crawler.log"));
|
||||
WarcArchiverIf warcArchiver = warcArchiverFactory.get(outputDir);
|
||||
AnchorTagsSource anchorTagsSource = anchorTagsSourceFactory.create(specProvider.getDomains())
|
||||
) {
|
||||
|
||||
@ -165,7 +171,7 @@ public class CrawlerMain {
|
||||
.takeWhile((e) -> abortMonitor.isAlive())
|
||||
.filter(e -> !workLog.isJobFinished(e.domain))
|
||||
.filter(e -> processingIds.put(e.domain, "") == null)
|
||||
.map(e -> new CrawlTask(e, anchorTagsSource, outputDir, workLog))
|
||||
.map(e -> new CrawlTask(e, anchorTagsSource, outputDir, warcArchiver, workLog))
|
||||
.forEach(pool::submitQuietly);
|
||||
}
|
||||
|
||||
@ -202,15 +208,18 @@ public class CrawlerMain {
|
||||
|
||||
private final AnchorTagsSource anchorTagsSource;
|
||||
private final Path outputDir;
|
||||
private final WarcArchiverIf warcArchiver;
|
||||
private final WorkLog workLog;
|
||||
|
||||
CrawlTask(CrawlSpecRecord specification,
|
||||
AnchorTagsSource anchorTagsSource,
|
||||
Path outputDir,
|
||||
WarcArchiverIf warcArchiver,
|
||||
WorkLog workLog) {
|
||||
this.specification = specification;
|
||||
this.anchorTagsSource = anchorTagsSource;
|
||||
this.outputDir = outputDir;
|
||||
this.warcArchiver = warcArchiver;
|
||||
this.workLog = workLog;
|
||||
|
||||
this.domain = specification.domain;
|
||||
@ -253,6 +262,8 @@ public class CrawlerMain {
|
||||
CrawledDocumentParquetRecordFileWriter
|
||||
.convertWarc(domain, userAgent, newWarcFile, parquetFile);
|
||||
|
||||
warcArchiver.consumeWarc(newWarcFile, domain);
|
||||
|
||||
workLog.setJobToFinished(domain, parquetFile.toString(), size);
|
||||
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
|
||||
|
||||
|
@ -0,0 +1,122 @@
|
||||
package nu.marginalia.crawl.warc;
|
||||
|
||||
import com.google.inject.Inject;
|
||||
import nu.marginalia.ProcessConfiguration;
|
||||
import nu.marginalia.nodecfg.NodeConfigurationService;
|
||||
import org.apache.commons.io.IOUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.io.PrintWriter;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.time.LocalDateTime;
|
||||
import java.time.format.DateTimeFormatter;
|
||||
|
||||
/** Factory for creating WarcArchiverIf instances. Depending on the node's configuration,
|
||||
* either a shredder instance that just discards the Warc file, or a persisting instance
|
||||
* that creates a series of concatenated warc.gz-files with an index
|
||||
*/
|
||||
public class WarcArchiverFactory {
|
||||
private final boolean keepWarcs;
|
||||
|
||||
@Inject
|
||||
public WarcArchiverFactory(ProcessConfiguration processConfiguration,
|
||||
NodeConfigurationService nodeConfigurationService)
|
||||
throws Exception
|
||||
{
|
||||
keepWarcs = nodeConfigurationService.get(processConfiguration.node()).keepWarcs();
|
||||
}
|
||||
|
||||
public WarcArchiverIf get(Path outputDir) throws IOException {
|
||||
if (!keepWarcs) {
|
||||
return new WarcArchiverShredder();
|
||||
} else {
|
||||
return new WarcArchiver(outputDir);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/** Dummy archiver that just deletes the warc file. */
|
||||
class WarcArchiverShredder implements WarcArchiverIf {
|
||||
@Override
|
||||
public void consumeWarc(Path warcFile, String domain) throws IOException {
|
||||
Files.deleteIfExists(warcFile);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {}
|
||||
}
|
||||
|
||||
/** Archives warc files to disk. Concatenates all warc files into a single
|
||||
* warc file, and creates an index file with the offsets and lengths of
|
||||
* each domain segment.
|
||||
* */
|
||||
class WarcArchiver implements WarcArchiverIf {
|
||||
// Specs say the recommended maximum size of a warc file is ~1GB, after which a new file should be created
|
||||
private static final long MAX_COMBINED_WARC_FILE_SIZE = 1_000_000_000;
|
||||
|
||||
|
||||
private PrintWriter indexWriter;
|
||||
private OutputStream warcWriter;
|
||||
private final Path warcDir;
|
||||
|
||||
String warcFileName = null;
|
||||
String ts = LocalDateTime.now()
|
||||
.format(DateTimeFormatter.ISO_LOCAL_DATE_TIME)
|
||||
.replace(':', '-');
|
||||
|
||||
long pos = 0;
|
||||
int fileCounter = 0;
|
||||
|
||||
public WarcArchiver(Path outputDir) throws IOException {
|
||||
warcDir = outputDir.resolve("warc");
|
||||
|
||||
if (!Files.exists(warcDir)) {
|
||||
Files.createDirectories(warcDir);
|
||||
}
|
||||
|
||||
switchFile();
|
||||
}
|
||||
|
||||
private void switchFile() throws IOException {
|
||||
if (warcWriter != null) warcWriter.close();
|
||||
|
||||
warcFileName = "marginalia-crawl-" + ts + "--" + String.format("%04d", fileCounter++) + ".warc.gz";
|
||||
|
||||
warcWriter = Files.newOutputStream(warcDir.resolve(warcFileName));
|
||||
|
||||
if (indexWriter == null) {
|
||||
Path indexFile = warcDir.resolve("marginalia-crawl-" + ts + ".idx");
|
||||
indexWriter = new PrintWriter(Files.newBufferedWriter(indexFile));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consumeWarc(Path warcFile, String domain) throws IOException {
|
||||
try {
|
||||
synchronized (this) {
|
||||
// Specs say the recommended maximum size of a warc file is ~1GB
|
||||
if (pos > MAX_COMBINED_WARC_FILE_SIZE) {
|
||||
switchFile();
|
||||
}
|
||||
|
||||
indexWriter.printf("%s %s %d %d\n", warcFileName, domain, pos, Files.size(warcFile));
|
||||
indexWriter.flush();
|
||||
try (var is = Files.newInputStream(warcFile)) {
|
||||
pos += IOUtils.copy(is, warcWriter);
|
||||
}
|
||||
}
|
||||
}
|
||||
finally {
|
||||
Files.deleteIfExists(warcFile);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (warcWriter != null) warcWriter.close();
|
||||
if (indexWriter != null) indexWriter.close();
|
||||
}
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
package nu.marginalia.crawl.warc;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
|
||||
public interface WarcArchiverIf extends AutoCloseable {
|
||||
/** Process the warc file. After processing, the warc file is deleted.
|
||||
* Processing may be a no-op, depending on the implementation.
|
||||
*/
|
||||
void consumeWarc(Path warcFile, String domain) throws IOException;
|
||||
void close() throws IOException;
|
||||
}
|
@ -300,6 +300,7 @@ public class ControlNodeService {
|
||||
"on".equalsIgnoreCase(request.queryParams("acceptQueries")),
|
||||
"on".equalsIgnoreCase(request.queryParams("autoClean")),
|
||||
"on".equalsIgnoreCase(request.queryParams("includeInPrecession")),
|
||||
"on".equalsIgnoreCase(request.queryParams("keepWarcs")),
|
||||
"on".equalsIgnoreCase(request.queryParams("disabled"))
|
||||
);
|
||||
|
||||
|
@ -44,6 +44,17 @@
|
||||
<div class="form-text">If true, this node will be included in the crawling precession.</div>
|
||||
</div>
|
||||
|
||||
<div class="form-check form-switch">
|
||||
<input class="form-check-input" type="checkbox" role="switch" name="keepWarcs" {{#if config.keepWarcs}}checked{{/if}}>
|
||||
<label class="form-check-label" for="includeInPrecession">Keep WARC files during crawling</label>
|
||||
|
||||
<div class="form-text">This toggle makes the crawler retain copies of the WebARChive data that is
|
||||
normally an intermediate product of the crawling. This is useful for debugging
|
||||
and testing, but the WARC files are large and take up a lot of space. Unless
|
||||
there is a need for exporting these files, it is recommended to leave this off.
|
||||
</div>
|
||||
</div>
|
||||
|
||||
<div class="form-check form-switch mb-3">
|
||||
<input class="form-check-input" type="checkbox" role="switch" name="disabled" {{#if config.disabled}}checked{{/if}}>
|
||||
<label class="form-check-label" for="disabled">Disabled</label>
|
||||
|
Loading…
Reference in New Issue
Block a user