Faster crawling

This commit is contained in:
vlofgren 2022-08-10 17:03:58 +02:00
parent 9c6e3b1772
commit ce09fce639
11 changed files with 179 additions and 150 deletions

View File

@ -1,6 +1,5 @@
package nu.marginalia.wmsa.edge.converting;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
@ -10,8 +9,6 @@ import nu.marginalia.wmsa.edge.converting.interpreter.Instruction;
import nu.marginalia.wmsa.edge.converting.processor.DomainProcessor;
import nu.marginalia.wmsa.edge.converting.processor.InstructionsCompiler;
import nu.marginalia.wmsa.edge.crawling.CrawlPlanLoader;
import nu.marginalia.wmsa.edge.crawling.CrawledDomainReader;
import nu.marginalia.wmsa.edge.crawling.CrawlerSpecificationLoader;
import nu.marginalia.wmsa.edge.crawling.WorkLog;
import nu.marginalia.wmsa.edge.crawling.model.CrawledDomain;
import nu.marginalia.wmsa.edge.model.EdgeCrawlPlan;
@ -20,24 +17,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class ConverterMain {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final DomainProcessor processor;
private final InstructionsCompiler compiler;
private final WorkLog processLog;
private final CrawledInstructionWriter instructionWriter;
private final Gson gson;
private final CrawledDomainReader reader = new CrawledDomainReader();
private final Map<String, String> domainToId = new HashMap<>();
private final Map<String, String> idToFileName = new HashMap<>();
public static void main(String... args) throws IOException {
if (args.length != 1) {
@ -60,65 +46,42 @@ public class ConverterMain {
InstructionsCompiler compiler,
Gson gson
) throws Exception {
this.processor = processor;
this.compiler = compiler;
this.gson = gson;
instructionWriter = new CrawledInstructionWriter(plan.process.getDir(), gson);
logger.info("Loading input spec");
CrawlerSpecificationLoader.readInputSpec(plan.getJobSpec(),
spec -> domainToId.put(spec.domain, spec.id));
logger.info("Replaying crawl log");
WorkLog.readLog(plan.crawl.getLogFile(),
entry -> idToFileName.put(entry.id(), entry.path()));
logger.info("Starting pipe");
processLog = new WorkLog(plan.process.getLogFile());
try (WorkLog processLog = plan.createProcessWorkLog()) {
var pipe = new ParallelPipe<CrawledDomain, ProcessingInstructions>("Crawler", 48, 4, 2) {
var pipe = new ParallelPipe<CrawledDomain, ProcessingInstructions>("Crawler", 48, 4, 2) {
@Override
protected ProcessingInstructions onProcess(CrawledDomain domainData) {
var processed = processor.process(domainData);
return new ProcessingInstructions(domainData.id, compiler.compile(processed));
}
@Override
protected ProcessingInstructions onProcess(CrawledDomain domainData) {
var processed = processor.process(domainData);
var compiled = compiler.compile(processed);
@Override
protected void onReceive(ProcessingInstructions processedInstructions) throws IOException {
var instructions = processedInstructions.instructions;
instructions.removeIf(Instruction::isNoOp);
String where = instructionWriter.accept(processedInstructions.id, instructions);
processLog.setJobToFinished(processedInstructions.id, where, instructions.size());
}
};
domainToId.forEach((domain, id) -> {
String fileName = idToFileName.get(id);
if (Strings.isNullOrEmpty(fileName))
return;
Path dest = plan.getCrawledFilePath(fileName);
logger.info("{} - {} - {}", domain, id, dest);
if (!processLog.isJobFinished(id)) {
try {
var cd = reader.read(dest);
pipe.accept(cd);
} catch (IOException e) {
logger.error("Failed to read {}", dest);
return new ProcessingInstructions(domainData.id, compiled);
}
}
});
pipe.join();
@Override
protected void onReceive(ProcessingInstructions processedInstructions) throws IOException {
var instructions = processedInstructions.instructions;
instructions.removeIf(Instruction::isNoOp);
processLog.close();
String where = instructionWriter.accept(processedInstructions.id, instructions);
processLog.setJobToFinished(processedInstructions.id, where, instructions.size());
}
};
plan.forEachCrawledDomain(domain -> {
if (!processLog.isJobFinished(domain.id)) {
logger.info("{} - {}", domain.domain, domain.id);
pipe.accept(domain);
}
});
pipe.join();
}
logger.info("Finished");

View File

@ -61,6 +61,7 @@ public class LinkKeywordExtractorMain {
.forEach(crawledUrls::add);
logger.info("Loading input spec");
CrawlerSpecificationLoader.readInputSpec(plan.getJobSpec(),
spec -> { crawledDomains.add(spec.domain); });

View File

@ -3,12 +3,15 @@ package nu.marginalia.wmsa.edge.crawling;
import com.github.luben.zstd.ZstdInputStream;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import nu.marginalia.wmsa.edge.crawling.model.CrawledDocument;
import nu.marginalia.wmsa.edge.crawling.model.CrawledDomain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
public class CrawledDomainReader {
private final Gson gson = new GsonBuilder().create();
@ -18,13 +21,37 @@ public class CrawledDomainReader {
}
public CrawledDomain read(Path path) throws IOException {
List<CrawledDocument> docs = new ArrayList<>();
CrawledDomain domain = null;
try (var br = new BufferedReader(new InputStreamReader(new ZstdInputStream(new BufferedInputStream(new FileInputStream(path.toFile())))))) {
return gson.fromJson(br, CrawledDomain.class);
String line;
while ((line = br.readLine()) != null) {
if (line.startsWith("//")) {
String nextLine = br.readLine();
if (nextLine == null) break;
if (line.equals(CrawledDomain.SERIAL_IDENTIFIER)) {
domain = gson.fromJson(nextLine, CrawledDomain.class);
} else if (line.equals(CrawledDocument.SERIAL_IDENTIFIER)) {
docs.add(gson.fromJson(nextLine, CrawledDocument.class));
}
}
else if (line.charAt(0) == '{') {
domain = gson.fromJson(line, CrawledDomain.class);
}
}
}
if (domain == null) {
return null;
}
domain.doc.addAll(docs);
return domain;
}
public CrawledDomain readRuntimeExcept(Path path) {
try (var br = new BufferedReader(new InputStreamReader(new ZstdInputStream(new BufferedInputStream(new FileInputStream(path.toFile())))))) {
return gson.fromJson(br, CrawledDomain.class);
try {
return read(path);
}
catch (Exception ex) {
throw new RuntimeException(ex);

View File

@ -3,40 +3,44 @@ package nu.marginalia.wmsa.edge.crawling;
import com.github.luben.zstd.ZstdOutputStream;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import nu.marginalia.wmsa.edge.crawling.model.CrawledDomain;
import nu.marginalia.wmsa.edge.crawling.model.SerializableCrawlData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.nio.file.Files;
import java.nio.file.Path;
public class CrawledDomainWriter {
public class CrawledDomainWriter implements AutoCloseable {
private final Path outputDir;
private final Gson gson = new GsonBuilder().create();
private static final Logger logger = LoggerFactory.getLogger(CrawledDomainWriter.class);
private final Writer writer;
private final Path outputFile;
public CrawledDomainWriter(Path outputDir) {
public CrawledDomainWriter(Path outputDir, String name, String id) throws IOException {
this.outputDir = outputDir;
if (!Files.isDirectory(outputDir)) {
throw new IllegalArgumentException("Output dir " + outputDir + " does not exist");
}
outputFile = getOutputFile(id, name);
writer = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(Files.newOutputStream(outputFile))));
}
public String accept(CrawledDomain domainData) throws IOException {
Path outputFile = getOutputFile(domainData.id, domainData.domain);
public Path getOutputFile() {
return outputFile;
}
try (var outputStream = new OutputStreamWriter(new ZstdOutputStream(new BufferedOutputStream(new FileOutputStream(outputFile.toFile()))))) {
logger.info("Writing {} - {}", domainData.id, domainData.domain);
gson.toJson(domainData, outputStream);
}
return outputFile.getFileName().toString();
public void accept(SerializableCrawlData data) throws IOException {
writer.write(data.getSerialIdentifier());
writer.write('\n');
gson.toJson(data, writer);
writer.write('\n');
}
private Path getOutputFile(String id, String name) throws IOException {
@ -63,4 +67,9 @@ public class CrawledDomainWriter {
return nameSaneBuilder.toString();
}
@Override
public void close() throws IOException {
writer.close();
}
}

View File

@ -4,68 +4,38 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import nu.marginalia.wmsa.configuration.UserAgent;
import nu.marginalia.wmsa.configuration.WmsaHome;
import nu.marginalia.wmsa.edge.crawling.model.CrawledDomain;
import nu.marginalia.wmsa.edge.crawling.model.CrawlingSpecification;
import nu.marginalia.wmsa.edge.crawling.retreival.CrawlerRetreiver;
import nu.marginalia.wmsa.edge.crawling.retreival.HttpFetcher;
import nu.marginalia.util.ParallelPipe;
import nu.marginalia.wmsa.edge.model.EdgeCrawlPlan;
import okhttp3.Dispatcher;
import okhttp3.internal.Util;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.file.Path;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
public class CrawlerMain implements AutoCloseable {
public static Gson gson = new GsonBuilder().create();
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Path inputSpec;
private final EdgeCrawlPlan plan;
private final Path crawlDataDir;
private final WorkLog workLog;
private final CrawledDomainWriter domainWriter;
private final int numberOfThreads;
private final ParallelPipe<CrawlingSpecification, CrawledDomain> pipe;
private final Dispatcher dispatcher = new Dispatcher(new ThreadPoolExecutor(0, Integer.MAX_VALUE, 5, TimeUnit.SECONDS,
new SynchronousQueue<>(), Util.threadFactory("OkHttp Dispatcher", true)));
private final UserAgent userAgent;
public CrawlerMain(EdgeCrawlPlan plan) throws Exception {
this.inputSpec = plan.getJobSpec();
this.numberOfThreads = 512;
this.plan = plan;
this.userAgent = WmsaHome.getUserAgent();
workLog = new WorkLog(plan.crawl.getLogFile());
domainWriter = new CrawledDomainWriter(plan.crawl.getDir());
Semaphore sem = new Semaphore(250_000);
pipe = new ParallelPipe<>("Crawler", numberOfThreads, 2, 1) {
@Override
protected CrawledDomain onProcess(CrawlingSpecification crawlingSpecification) throws Exception {
int toAcquire = crawlingSpecification.urls.size();
sem.acquire(toAcquire);
try {
return fetchDomain(crawlingSpecification);
}
finally {
sem.release(toAcquire);
}
}
@Override
protected void onReceive(CrawledDomain crawledDomain) throws IOException {
writeDomain(crawledDomain);
}
};
workLog = plan.createCrawlWorkLog();
crawlDataDir = plan.crawl.getDir();
}
public static void main(String... args) throws Exception {
@ -84,48 +54,54 @@ public class CrawlerMain implements AutoCloseable {
crawler.run();
}
// TODO (2022-05-24): Some thread isn't set to daemon mode, need to explicitly harakiri the process, find why?
System.exit(0);
}
private CrawledDomain fetchDomain(CrawlingSpecification specification) {
private void fetchDomain(CrawlingSpecification specification) {
if (workLog.isJobFinished(specification.id))
return null;
return;
var fetcher = new HttpFetcher(userAgent.uaString(), dispatcher);
try {
var retreiver = new CrawlerRetreiver(fetcher, specification);
try (var writer = new CrawledDomainWriter(crawlDataDir, specification.domain, specification.id)) {
var retreiver = new CrawlerRetreiver(fetcher, specification, writer);
return retreiver.fetch();
int size = retreiver.fetch();
workLog.setJobToFinished(specification.id, writer.getOutputFile().toString(), size);
logger.info("Fetched {}", specification.domain);
} catch (Exception e) {
logger.error("Error fetching domain", e);
return null;
}
}
private void writeDomain(CrawledDomain crawledDomain) throws IOException {
String name = domainWriter.accept(crawledDomain);
workLog.setJobToFinished(crawledDomain.id, name, crawledDomain.size());
}
public void run() throws InterruptedException {
// First a validation run to ensure the file is all good to parse
logger.info("Validating JSON");
CrawlerSpecificationLoader.readInputSpec(inputSpec, spec -> {});
plan.forEachCrawlingSpecification(unused -> {});
logger.info("Starting pipe");
CrawlerSpecificationLoader.readInputSpec(inputSpec, pipe::accept);
logger.info("Let's go");
if (!AbortMonitor.getInstance().isAlive()) {
logger.info("Aborting");
pipe.clearQueues();
}
else {
logger.info("All jobs queued, waiting for pipe to finish");
}
pipe.join();
final int poolSize = 1024;
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(10);
ThreadPoolExecutor pool = new ThreadPoolExecutor(poolSize/128, poolSize, 5, TimeUnit.MINUTES, queue); // maybe need to set -Xss for JVM to deal with this?
AbortMonitor abortMonitor = AbortMonitor.getInstance();
plan.forEachCrawlingSpecification(spec -> {
if (abortMonitor.isAlive()) {
pool.execute(() -> fetchDomain(spec));
}
});
logger.info("Awaiting termination");
pool.shutdown();
while (!pool.awaitTermination(1, TimeUnit.SECONDS));
logger.info("All finished");
}

View File

@ -3,7 +3,7 @@ package nu.marginalia.wmsa.edge.crawling.model;
import lombok.Builder;
@Builder
public class CrawledDocument {
public class CrawledDocument implements SerializableCrawlData {
public String crawlId;
public String url;
@ -22,4 +22,10 @@ public class CrawledDocument {
public String canonicalUrl;
public String redirectUrl;
public static final String SERIAL_IDENTIFIER = "// DOCUMENT";
@Override
public String getSerialIdentifier() {
return SERIAL_IDENTIFIER;
}
}

View File

@ -7,7 +7,7 @@ import lombok.Data;
import java.util.List;
@AllArgsConstructor @Data @Builder
public class CrawledDomain {
public class CrawledDomain implements SerializableCrawlData {
public String id;
public String domain;
@ -24,4 +24,10 @@ public class CrawledDomain {
if (doc == null) return 0;
return doc.size();
}
public static final String SERIAL_IDENTIFIER = "// DOMAIN";
@Override
public String getSerialIdentifier() {
return SERIAL_IDENTIFIER;
}
}

View File

@ -0,0 +1,5 @@
package nu.marginalia.wmsa.edge.crawling.model;
public interface SerializableCrawlData {
String getSerialIdentifier();
}

View File

@ -4,6 +4,7 @@ import com.google.common.hash.HashFunction;
import com.google.common.hash.Hashing;
import lombok.SneakyThrows;
import nu.marginalia.wmsa.edge.converting.processor.logic.LinkParser;
import nu.marginalia.wmsa.edge.crawling.CrawledDomainWriter;
import nu.marginalia.wmsa.edge.crawling.blocklist.GeoIpBlocklist;
import nu.marginalia.wmsa.edge.crawling.blocklist.IpBlockList;
import nu.marginalia.wmsa.edge.crawling.blocklist.UrlBlocklist;
@ -14,6 +15,7 @@ import org.jsoup.nodes.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.LocalDateTime;
@ -29,6 +31,7 @@ public class CrawlerRetreiver {
private final int depth;
private final String id;
private final String domain;
private final CrawledDomainWriter crawledDomainWriter;
private static final LinkParser linkParser = new LinkParser();
private static final Logger logger = LoggerFactory.getLogger(CrawlerRetreiver.class);
@ -45,7 +48,7 @@ public class CrawlerRetreiver {
}
}
public CrawlerRetreiver(HttpFetcher fetcher, CrawlingSpecification specs) {
public CrawlerRetreiver(HttpFetcher fetcher, CrawlingSpecification specs, CrawledDomainWriter crawledDomainWriter) {
this.fetcher = fetcher;
visited = new HashSet<>((int)(specs.urls.size() * 1.5));
known = new HashSet<>(specs.urls.size() * 10);
@ -53,6 +56,7 @@ public class CrawlerRetreiver {
depth = specs.crawlDepth;
id = specs.id;
domain = specs.domain;
this.crawledDomainWriter = crawledDomainWriter;
specs.urls.stream()
.map(this::parseUrl)
@ -78,12 +82,18 @@ public class CrawlerRetreiver {
}
}
public CrawledDomain fetch() {
public int fetch() throws IOException {
logger.info("Fetching {}", domain);
Optional<CrawledDomain> probeResult = probeDomainForProblems(domain);
return probeResult.orElseGet(this::crawlDomain);
if (probeResult.isPresent()) {
crawledDomainWriter.accept(probeResult.get());
return 1;
}
else {
return crawlDomain();
}
}
private Optional<CrawledDomain> probeDomainForProblems(String domain) {
@ -118,7 +128,7 @@ public class CrawlerRetreiver {
return Optional.empty();
}
private CrawledDomain crawlDomain() {
private int crawlDomain() throws IOException {
String ip = findIp(domain);
assert !queue.isEmpty();
@ -130,6 +140,8 @@ public class CrawlerRetreiver {
CrawledDomain ret = new CrawledDomain(id, domain, null, CrawlerDomainStatus.OK.name(), null, ip, docs, null);
int visitedCount = 0;
int fetchedCount = 0;
while (!queue.isEmpty() && visitedCount < depth) {
var top = queue.removeFirst();
@ -150,7 +162,11 @@ public class CrawlerRetreiver {
logger.debug("Fetching {}", top);
long startTime = System.currentTimeMillis();
fetchUrl(top).ifPresent(ret.doc::add);
var doc = fetchUrl(top);
if (doc.isPresent()) {
fetchedCount++;
crawledDomainWriter.accept(doc.get());
}
long crawledTime = System.currentTimeMillis() - startTime;
delay(crawlDelay, crawledTime);
@ -160,7 +176,9 @@ public class CrawlerRetreiver {
ret.cookies = fetcher.getCookies();
return ret;
crawledDomainWriter.accept(ret);
return fetchedCount;
}
private Optional<CrawledDocument> fetchUrl(EdgeUrl top) {

View File

@ -7,7 +7,6 @@ import crawlercommons.robots.SimpleRobotRulesParser;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.ToString;
import nu.marginalia.wmsa.edge.converting.processor.logic.LinkParser;
import nu.marginalia.wmsa.edge.crawling.model.CrawledDocument;
import nu.marginalia.wmsa.edge.crawling.model.CrawlerDocumentStatus;
import nu.marginalia.wmsa.edge.model.EdgeDomain;
@ -43,8 +42,6 @@ public class HttpFetcher {
private static final SimpleRobotRulesParser robotsParser = new SimpleRobotRulesParser();
private final LinkParser linkParser = new LinkParser();
public void setAllowAllContentTypes(boolean allowAllContentTypes) {
this.allowAllContentTypes = allowAllContentTypes;
}

View File

@ -5,9 +5,11 @@ import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.ToString;
import nu.marginalia.wmsa.edge.crawling.CrawledDomainReader;
import nu.marginalia.wmsa.edge.crawling.CrawlerSpecificationLoader;
import nu.marginalia.wmsa.edge.crawling.WorkLog;
import nu.marginalia.wmsa.edge.crawling.model.CrawlLogEntry;
import nu.marginalia.wmsa.edge.crawling.model.CrawledDomain;
import nu.marginalia.wmsa.edge.crawling.model.CrawlingSpecification;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
@ -51,7 +53,26 @@ public class EdgeCrawlPlan {
return process.getDir().resolve(sp1).resolve(sp2).resolve(fileName);
}
public void forEachCrawledDomain(Consumer<CrawledDomain> consumer) {
public WorkLog createCrawlWorkLog() throws IOException {
return new WorkLog(crawl.getLogFile());
}
public WorkLog createProcessWorkLog() throws IOException {
return new WorkLog(process.getLogFile());
}
public void forEachCrawlingSpecification(Consumer<CrawlingSpecification> consumer) {
CrawlerSpecificationLoader.readInputSpec(getJobSpec(), consumer);
}
public void forEachCrawlingLogEntry(Consumer<CrawlLogEntry> consumer) {
WorkLog.readLog(this.crawl.getLogFile(), consumer);
}
public void forEachProcessingLogEntry(Consumer<CrawlLogEntry> consumer) {
WorkLog.readLog(this.process.getLogFile(), consumer);
}
public void forEachCrawledDomain(Consumer<CrawledDomain> consumer) {
final CrawledDomainReader reader = new CrawledDomainReader();
try (Stream<CrawlLogEntry> entryStream = WorkLog.streamLog(crawl.getLogFile())) {