(converter) Route sizeHint to SideloadProcessing
Route the sizeHint from the input parquet file to SideloadProcessing, so that it can set sideloadSizeAdvice appropriately, instead of using a fixed "large" number. This is necessary to populate the KNOWN_URL column in the domain data table, which is important as it is used in e.g. calculating how far to re-crawl the site in the future.
This commit is contained in:
parent
0b112cb4d4
commit
7ba296ccdf
2 changed files with 10 additions and 10 deletions
|
@ -57,19 +57,20 @@ public class DomainProcessor {
|
|||
}
|
||||
|
||||
public ConverterBatchWritableIf createWritable(SerializableCrawlDataStream domain) {
|
||||
if (domain.sizeHint() > 10_000) {
|
||||
final int sizeHint = domain.sizeHint();
|
||||
|
||||
if (sizeHint > 10_000) {
|
||||
// If the file is too big, we run a processing mode that doesn't
|
||||
// require loading the entire dataset into RAM
|
||||
logger.info("Sideloading {}", domain.path());
|
||||
return sideloadProcessing(domain);
|
||||
return sideloadProcessing(domain, sizeHint);
|
||||
}
|
||||
|
||||
return fullProcessing(domain);
|
||||
}
|
||||
|
||||
public SideloadProcessing sideloadProcessing(SerializableCrawlDataStream dataStream) {
|
||||
public SideloadProcessing sideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint) {
|
||||
try {
|
||||
return new SideloadProcessing(dataStream);
|
||||
return new SideloadProcessing(dataStream, sizeHint);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.warn("Failed to process domain sideload", ex);
|
||||
|
@ -86,17 +87,16 @@ public class DomainProcessor {
|
|||
private final DomainLinks externalDomainLinks;
|
||||
private final LshDocumentDeduplicator deduplicator = new LshDocumentDeduplicator();
|
||||
|
||||
SideloadProcessing(SerializableCrawlDataStream dataStream) throws IOException {
|
||||
SideloadProcessing(SerializableCrawlDataStream dataStream, int sizeHint) throws IOException {
|
||||
this.dataStream = dataStream;
|
||||
|
||||
if (!dataStream.hasNext()
|
||||
|| !(dataStream.next() instanceof CrawledDomain crawledDomain))
|
||||
if (!dataStream.hasNext() || !(dataStream.next() instanceof CrawledDomain crawledDomain))
|
||||
{
|
||||
throw new IllegalStateException("First record must be a domain, was " + dataStream.next().getClass().getSimpleName());
|
||||
}
|
||||
|
||||
domain = new ProcessedDomain();
|
||||
domain.sizeloadSizeAdvice = 10_000;
|
||||
domain.sizeloadSizeAdvice = sizeHint == 0 ? 10_000 : sizeHint;
|
||||
|
||||
documentDecorator = new DocumentDecorator(anchorTextKeywords);
|
||||
processDomain(crawledDomain, domain, documentDecorator);
|
||||
|
|
|
@ -96,7 +96,7 @@ public class ConvertingIntegrationTest {
|
|||
|
||||
@Test
|
||||
public void testMemexMarginaliaNuSideloadProcessing() throws IOException {
|
||||
var ret = domainProcessor.sideloadProcessing(asSerializableCrawlData(readMarginaliaWorkingSet()));
|
||||
var ret = domainProcessor.sideloadProcessing(asSerializableCrawlData(readMarginaliaWorkingSet()), 100);
|
||||
assertNotNull(ret);
|
||||
assertEquals("memex.marginalia.nu", ret.id());
|
||||
|
||||
|
|
Loading…
Reference in a new issue