(executor) Actor for exporting anchor tag data from crawl data

This commit is contained in:
Viktor Lofgren 2023-10-31 17:32:34 +01:00
parent ffadfb4149
commit 23f2068e33
5 changed files with 182 additions and 7 deletions

View File

@ -31,11 +31,14 @@ dependencies {
implementation project(':code:common:service-client')
implementation project(':code:common:service-discovery')
implementation project(':third-party:commons-codec')
implementation project(':code:libraries:message-queue')
implementation project(':code:process-models:crawl-spec')
implementation project(':code:process-models:crawling-model')
implementation project(':code:features-crawl:link-parser')
implementation project(':code:features-index:index-journal')
implementation project(':code:api:index-api')
implementation project(':code:api:query-api')
implementation project(':code:api:process-mqapi')
@ -48,9 +51,11 @@ dependencies {
implementation libs.prometheus
implementation libs.notnull
implementation libs.guice
implementation libs.trove
implementation libs.protobuf
implementation libs.rxjava
implementation libs.zstd
implementation libs.jsoup
implementation libs.commons.io
implementation libs.bundles.mariadb

View File

@ -12,6 +12,7 @@ public enum ExecutorActor {
ADJACENCY_CALCULATION,
CRAWL_JOB_EXTRACTOR,
EXPORT_DATA,
EXPORT_ATAGS,
PROC_INDEX_CONSTRUCTOR_SPAWNER,
CONVERT,
RESTORE_BACKUP;

View File

@ -46,7 +46,8 @@ public class ExecutorActorControlService {
IndexConstructorMonitorActor indexConstructorMonitorActor,
TriggerAdjacencyCalculationActor triggerAdjacencyCalculationActor,
CrawlJobExtractorActor crawlJobExtractorActor,
ExportDataActor exportDataActor
ExportDataActor exportDataActor,
ExportAtagsActor exportAtagsActor
) {
this.messageQueueFactory = messageQueueFactory;
this.eventLog = baseServiceParams.eventLog;
@ -70,6 +71,7 @@ public class ExecutorActorControlService {
register(ExecutorActor.ADJACENCY_CALCULATION, triggerAdjacencyCalculationActor);
register(ExecutorActor.CRAWL_JOB_EXTRACTOR, crawlJobExtractorActor);
register(ExecutorActor.EXPORT_DATA, exportDataActor);
register(ExecutorActor.EXPORT_ATAGS, exportAtagsActor);
}
private void register(ExecutorActor process, ActorPrototype graph) {

View File

@ -0,0 +1,172 @@
package nu.marginalia.actor.task;
import com.google.gson.Gson;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import gnu.trove.set.hash.TLongHashSet;
import lombok.SneakyThrows;
import nu.marginalia.hash.MurmurHash3_128;
import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.storage.model.*;
import org.jsoup.Jsoup;
import nu.marginalia.actor.prototype.RecordActorPrototype;
import nu.marginalia.actor.state.ActorStep;
import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.storage.FileStorageService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.PosixFilePermissions;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.zip.GZIPOutputStream;
@Singleton
public class ExportAtagsActor extends RecordActorPrototype {
private static final LinkParser linkParser = new LinkParser();
private static final MurmurHash3_128 hash = new MurmurHash3_128();
private final FileStorageService storageService;
private final Logger logger = LoggerFactory.getLogger(getClass());
public record Export(FileStorageId crawlId) implements ActorStep {}
public record Run(FileStorageId crawlId, FileStorageId destId) implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
return switch(self) {
case Export(FileStorageId crawlId) -> {
var storageBase = storageService.getStorageBase(FileStorageBaseType.STORAGE);
var storage = storageService.allocateTemporaryStorage(storageBase, FileStorageType.EXPORT, "atag-export", "Anchor Tags " + LocalDateTime.now());
if (storage == null) yield new Error("Bad storage id");
yield new Run(crawlId, storage.id());
}
case Run(FileStorageId crawlId, FileStorageId destId) -> {
FileStorage destStorage = storageService.getStorage(destId);
storageService.setFileStorageState(destId, FileStorageState.NEW);
var tmpFile = Files.createTempFile(destStorage.asPath(), "atags", ".csv.gz",
PosixFilePermissions.asFileAttribute(PosixFilePermissions.fromString("rw-r--r--")));
Path inputDir = storageService.getStorage(crawlId).asPath();
var reader = new CrawledDomainReader();
try (var bw = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(Files.newOutputStream(tmpFile, StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING))));
)
{
Path crawlerLogFile = inputDir.resolve("crawler.log");
var tagWriter = new ATagCsvWriter(bw);
for (var item : WorkLog.iterable(crawlerLogFile)) {
if (Thread.interrupted()) {
throw new InterruptedException();
}
Path crawlDataPath = inputDir.resolve(item.relPath());
try (var stream = reader.createDataStream(crawlDataPath)) {
exportLinks(tagWriter, stream);
}
catch (Exception ex) {
ex.printStackTrace();
}
}
Files.move(tmpFile, destStorage.asPath().resolve("atags.csv.gz"), StandardCopyOption.ATOMIC_MOVE, StandardCopyOption.REPLACE_EXISTING);
storageService.setFileStorageState(destId, FileStorageState.UNSET);
}
catch (Exception ex) {
logger.error("Failed to export blacklist", ex);
storageService.setFileStorageState(destId, FileStorageState.DELETE);
yield new Error("Failed to export blacklist");
}
finally {
Files.deleteIfExists(tmpFile);
}
yield new End();
}
default -> new Error();
};
}
private boolean exportLinks(ATagCsvWriter exporter, SerializableCrawlDataStream stream) throws IOException, URISyntaxException {
final TLongHashSet hashes = new TLongHashSet();
while (stream.hasNext()) {
if (!(stream.next() instanceof CrawledDocument doc))
continue;
if (null == doc.documentBody)
continue;
var baseUrl = new EdgeUrl(doc.url);
var parsed = Jsoup.parse(doc.documentBody);
for (var atag : parsed.getElementsByTag("a")) {
String linkText = atag.text();
if (linkText.isBlank())
continue;
var linkOpt = linkParser.parseLinkPermissive(baseUrl, atag);
linkOpt
.filter(url -> !Objects.equals(url.domain, baseUrl.domain))
.filter(url -> hashes.add(hash.hashNearlyASCII(linkText) ^ hash.hashNearlyASCII(url.toString())))
.ifPresent(url -> exporter.accept(url, baseUrl.domain, linkText));
}
}
return true;
}
private static class ATagCsvWriter {
private final BufferedWriter writer;
private ATagCsvWriter(BufferedWriter writer) {
this.writer = writer;
}
@SneakyThrows
public void accept(EdgeUrl url, EdgeDomain domain, String linkText) {
writer.write(String.format("\"%s\",\"%s\",\"%s\"\n",
csvify(url),
csvify(domain),
csvify(linkText)));
}
private static String csvify(Object field) {
return field.toString()
.replace("\"", "\"\"")
.replace("\n", " ");
}
}
@Override
public String describe() {
return "Export anchor tags from crawl data";
}
@Inject
public ExportAtagsActor(Gson gson,
FileStorageService storageService)
{
super(gson);
this.storageService = storageService;
}
}

View File

@ -36,11 +36,6 @@ public class ExportDataActor extends RecordActorPrototype {
private final HikariDataSource dataSource;
private final Logger logger = LoggerFactory.getLogger(getClass());
@AllArgsConstructor @With @NoArgsConstructor
public static class Message {
public FileStorageId storageId = null;
};
public record Export() implements ActorStep {}
public record ExportBlacklist(FileStorageId fid) implements ActorStep {}
public record ExportDomains(FileStorageId fid) implements ActorStep {}