(converter) Refactor the DomainProcessor for new format of crawl data

With the new crawler modifications, the crawl data comes in a slightly different order, and a result of this is that we can optimize the converter.  This is a breaking change that will be incompatible with the old style of crawl data, hence it will linger as a branch for a while.

The first step is to move stuff out of the domain processor into the document processor.
This commit is contained in:
Viktor Lofgren 2023-12-27 13:57:59 +01:00
parent 9707366348
commit acf7bcc7a6
5 changed files with 134 additions and 137 deletions

View File

@ -0,0 +1,33 @@
package nu.marginalia.converting.processor;
import nu.marginalia.atags.AnchorTextKeywords;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.converting.model.ProcessedDocument;
import java.util.HashSet;
import java.util.Set;
public class DocumentDecorator {
private final Set<String> extraSearchTerms = new HashSet<>();
private final AnchorTextKeywords keywords;
private final DomainLinks externalDomainLinks;
public DocumentDecorator(AnchorTextKeywords keywords, DomainLinks externalDomainLinks) {
this.keywords = keywords;
this.externalDomainLinks = externalDomainLinks;
}
public void addTerm(String term) {
extraSearchTerms.add(term);
}
public void apply(ProcessedDocument doc) {
if (doc == null)
return;
if (doc.words == null)
return;
doc.words.addAllSyntheticTerms(extraSearchTerms);
doc.words.addAnchorTerms(keywords.getAnchorTextKeywords(externalDomainLinks, doc.url));
}
}

View File

@ -4,6 +4,7 @@ import com.google.inject.Inject;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import nu.marginalia.model.crawl.HtmlFeature;
import nu.marginalia.model.crawl.UrlIndexingState;
import nu.marginalia.converting.model.DisqualifiedException;
import nu.marginalia.converting.model.ProcessedDocument;
@ -38,7 +39,7 @@ public class DocumentProcessor {
processorPlugins.add(plainTextDocumentProcessorPlugin);
}
public ProcessedDocument process(CrawledDocument crawledDocument, DomainLinks externalDomainLinks) {
public ProcessedDocument process(CrawledDocument crawledDocument, DomainLinks externalDomainLinks, DocumentDecorator documentDecorator) {
ProcessedDocument ret = new ProcessedDocument();
try {
@ -51,7 +52,7 @@ public class DocumentProcessor {
default -> DocumentClass.EXTERNALLY_LINKED_MULTI;
};
processDocument(crawledDocument, documentClass, ret);
processDocument(crawledDocument, documentClass, documentDecorator, ret);
}
catch (DisqualifiedException ex) {
ret.state = UrlIndexingState.DISQUALIFIED;
@ -67,7 +68,7 @@ public class DocumentProcessor {
return ret;
}
private void processDocument(CrawledDocument crawledDocument, DocumentClass documentClass, ProcessedDocument ret) throws URISyntaxException, DisqualifiedException {
private void processDocument(CrawledDocument crawledDocument, DocumentClass documentClass, DocumentDecorator documentDecorator, ProcessedDocument ret) throws URISyntaxException, DisqualifiedException {
var crawlerStatus = CrawlerDocumentStatus.valueOf(crawledDocument.crawlerStatus);
if (crawlerStatus != CrawlerDocumentStatus.OK) {
@ -90,6 +91,16 @@ public class DocumentProcessor {
ret.details = detailsWithWords.details();
ret.words = detailsWithWords.words();
documentDecorator.apply(ret);
if (Boolean.TRUE.equals(crawledDocument.hasCookies)
&& ret.details != null
&& ret.details.features != null)
{
ret.details.features.add(HtmlFeature.COOKIES);
}
}
private AbstractDocumentProcessorPlugin findPlugin(CrawledDocument crawledDocument) throws DisqualifiedException {

View File

@ -17,7 +17,6 @@ import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.converting.processor.logic.links.TopKeywords;
import nu.marginalia.converting.processor.logic.LshDocumentDeduplicator;
import nu.marginalia.model.crawl.HtmlFeature;
import org.apache.commons.lang3.StringUtils;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
@ -32,7 +31,6 @@ public class DomainProcessor {
private final SiteWords siteWords;
private final AnchorTagsSource anchorTagsSource;
private final AnchorTextKeywords anchorTextKeywords;
private final LshDocumentDeduplicator documentDeduplicator;
private final GeoIpDictionary geoIpDictionary;
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -42,12 +40,11 @@ public class DomainProcessor {
SiteWords siteWords,
AnchorTagsSourceFactory anchorTagsSourceFactory,
AnchorTextKeywords anchorTextKeywords,
LshDocumentDeduplicator documentDeduplicator, GeoIpDictionary geoIpDictionary) throws SQLException
GeoIpDictionary geoIpDictionary) throws SQLException
{
this.documentProcessor = documentProcessor;
this.siteWords = siteWords;
this.anchorTextKeywords = anchorTextKeywords;
this.documentDeduplicator = documentDeduplicator;
this.anchorTagsSource = anchorTagsSourceFactory.create();
this.geoIpDictionary = geoIpDictionary;
@ -61,117 +58,101 @@ public class DomainProcessor {
return null;
}
var ret = new ProcessedDomain();
ProcessedDomain ret = new ProcessedDomain();
List<ProcessedDocument> docs = new ArrayList<>();
Set<String> processedUrls = new HashSet<>();
boolean cookies = false;
String ip = "";
DomainLinks externalDomainLinks = null;
while (dataStream.hasNext()) {
var data = dataStream.next();
DocumentDecorator documentDecorator = null;
// Do a lazy load of the external domain links since we don't know the domain
// until we see the first document
if (externalDomainLinks == null) {
var domain = data.getDomain();
try (var deduplicator = new LshDocumentDeduplicator()){
while (dataStream.hasNext()) {
var data = dataStream.next();
if (domain != null) {
externalDomainLinks = anchorTagsSource.getAnchorTags(domain);
}
}
// Do a lazy load of the external domain links since we don't know the domain
// until we see the first document
if (externalDomainLinks == null) {
var domain = data.getDomain();
if (data instanceof CrawledDomain crawledDomain) {
ret.domain = new EdgeDomain(crawledDomain.domain);
ret.ip = crawledDomain.ip;
cookies = crawledDomain.hasCookies();
ip = crawledDomain.ip;
if (crawledDomain.redirectDomain != null) {
ret.redirect = new EdgeDomain(crawledDomain.redirectDomain);
}
ret.documents = docs;
ret.state = getState(crawledDomain.crawlerStatus);
}
else if (data instanceof CrawledDocument doc) {
try {
if (doc.url == null || !processedUrls.add(doc.url))
continue;
if (Boolean.TRUE.equals(doc.hasCookies)) {
cookies = true;
if (domain != null) {
externalDomainLinks = anchorTagsSource.getAnchorTags(domain);
}
// This case should never be reachable, as we should have initiated
// the externalDomainLinks variable above if we made it past the
// doc.url == null check; but we'll leave it here just in case
// to make debugging easier if we break this.
assert externalDomainLinks != null : "externalDomainLinks has not been initialized";
docs.add(documentProcessor.process(doc, externalDomainLinks));
}
catch (Exception ex) {
logger.warn("Failed to process " + doc.url, ex);
if (data instanceof CrawledDomain crawledDomain) {
documentDecorator = new DocumentDecorator(anchorTextKeywords, externalDomainLinks);
ret = processDomain(crawledDomain, ret, documentDecorator);
ret.documents = docs;
} else if (data instanceof CrawledDocument doc) {
try {
if (doc.url == null || !processedUrls.add(doc.url))
continue;
var processedDoc = documentProcessor.process(doc, externalDomainLinks, documentDecorator);
deduplicator.markIfDuplicate(processedDoc);
docs.add(processedDoc);
} catch (Exception ex) {
logger.warn("Failed to process " + doc.url, ex);
}
}
}
}
// Add late keywords and features from domain-level information
List<String> terms = new ArrayList<>();
addIpInfo(terms, ip);
if (cookies) {
terms.add(HtmlFeature.COOKIES.getKeyword());
}
if (isAcademicDomain(ret.domain)) {
terms.add("special:academia");
}
for (var document : ret.documents) {
if (document.details == null)
continue;
if (cookies) {
document.details.features.add(HtmlFeature.COOKIES);
}
document.words.addAllSyntheticTerms(terms);
document.words.addAnchorTerms(
anchorTextKeywords.getAnchorTextKeywords(externalDomainLinks, document.url)
);
}
documentDeduplicator.deduplicate(ret.documents);
calculateStatistics(ret, externalDomainLinks);
return ret;
}
private void addIpInfo(List<String> terms, String ip) {
terms.add("ip:"+ip);
private ProcessedDomain processDomain(CrawledDomain crawledDomain,
ProcessedDomain ret,
DocumentDecorator decorator)
{
ret.domain = new EdgeDomain(crawledDomain.domain);
ret.ip = crawledDomain.ip;
addIpInfo(decorator, crawledDomain.ip);
if (isAcademicDomain(ret.domain)) {
decorator.addTerm("special:academia");
}
if (crawledDomain.redirectDomain != null) {
ret.redirect = new EdgeDomain(crawledDomain.redirectDomain);
}
ret.state = getState(crawledDomain.crawlerStatus);
return ret;
}
private void addIpInfo(DocumentDecorator decorator, String ip) {
decorator.addTerm("ip:"+ip);
// Add IP location country as a term
String country = geoIpDictionary.getCountry(ip);
if (!country.isBlank()) { // use the ip:-prefix as there's no real confusion between e.g. ip:127.0.0.1 and ip:uk
terms.add("ip:"+country.toLowerCase());
decorator.addTerm("ip:"+country.toLowerCase());
}
// Add ASN as a term
geoIpDictionary.getAsnInfo(ip).ifPresent(asnInfo -> {
terms.add("as:"+asnInfo.asn());
decorator.addTerm("as:"+asnInfo.asn());
for (var orgPart : StringUtils.split(asnInfo.org(), '-')) {
terms.add("as:"+orgPart.toLowerCase());
decorator.addTerm("as:"+orgPart.toLowerCase());
}
if (isCloudy(asnInfo)) {
terms.add("special:cloud");
decorator.addTerm("special:cloud");
}
});

View File

@ -1,74 +1,43 @@
package nu.marginalia.converting.processor.logic;
import com.google.inject.Singleton;
import gnu.trove.list.array.TLongArrayList;
import nu.marginalia.model.crawl.UrlIndexingState;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.lsh.EasyLSH;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/** Deduplicates documents based on their LSH
*
* @see EasyLSH
*/
@Singleton
public class LshDocumentDeduplicator {
public class LshDocumentDeduplicator implements AutoCloseable {
private final int DISTANCE_THRESHOLD = 2;
private final Logger logger = LoggerFactory.getLogger(getClass());
private final TLongArrayList hashCodes = new TLongArrayList(1000);
private static final int DISTANCE_THRESHOLD = 2;
public void deduplicate(List<ProcessedDocument> documents) {
ProcessedDocument[] goodDocuments = documents.stream()
.filter(ProcessedDocument::isProcessedFully)
.filter(doc -> doc.words.size() > 100)
.toArray(ProcessedDocument[]::new);
long[] hashCodes = new long[goodDocuments.length];
for (int i = 0; i < goodDocuments.length; i++) {
hashCodes[i] = goodDocuments[i].details.hashCode;
public void markIfDuplicate(ProcessedDocument document) {
if (!document.isProcessedFully()) {
return;
}
// These arrays can be fairly large (~10,000) so we need to be
// careful about what we do in this O(n^2) loop
if (document.words.size() < 100) {
return;
}
for (int i = 0; i < hashCodes.length; i++) {
for (int j = 0; j < hashCodes.length; j++) {
// This is basically just a 64 bit XOR and a POPCOUNT so it's pretty fast.
if (EasyLSH.hammingDistance(hashCodes[i], hashCodes[j]) < DISTANCE_THRESHOLD) {
if (i == j)
continue;
long hashCode = document.details.hashCode;
if (flagIfDuplicate(goodDocuments[i], goodDocuments[j])) {
break;
}
}
for (int i = 0; i < hashCodes.size(); i++) {
if (EasyLSH.hammingDistance(hashCode, hashCodes.get(i)) < DISTANCE_THRESHOLD) {
document.state = UrlIndexingState.DISQUALIFIED;
document.stateReason = "Duplicate";
return;
}
}
hashCodes.add(hashCode);
}
private boolean flagIfDuplicate(ProcessedDocument thisDoc, ProcessedDocument otherDoc) {
// This document has already been disqualified as a duplicate
if (thisDoc.state != UrlIndexingState.OK)
return false;
// We might consider using thisDoc.details.metadata.topology() here instead of the
// URL length to determine which document is the "better" one.
if (thisDoc.url.path.length()
< otherDoc.url.path.length())
{
logger.debug("{} duplicates {}", otherDoc.url, thisDoc.url);
otherDoc.state = UrlIndexingState.DISQUALIFIED;
otherDoc.stateReason = "Duplicate";
return true;
}
return false;
@Override
public void close() throws Exception {
hashCodes.clear(1);
}
}

View File

@ -139,10 +139,13 @@ public class ConvertingIntegrationTest {
private SerializableCrawlDataStream asSerializableCrawlData(CrawledDomain domain) {
List<SerializableCrawlData> data = new ArrayList<>();
data.add(domain);
if (domain.doc != null) {
data.addAll(domain.doc);
}
data.add(domain);
return SerializableCrawlDataStream.fromIterator(data.iterator());
}