(crawler) Refactor HttpFetcher to integrate WarcRecorder

Partially hook in the WarcRecorder into the crawler process.  So far it's not read, but should record the crawled documents.

The WarcRecorder and HttpFetcher classes were also refactored and broken apart to be easier to reason about.
This commit is contained in:
Viktor Lofgren 2023-12-08 17:12:51 +01:00
parent 072b5fcd12
commit 3bbffd3c22
17 changed files with 603 additions and 233 deletions

View File

@ -8,6 +8,7 @@ import nu.marginalia.converting.processor.DomainProcessor;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
@ -18,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -46,7 +48,7 @@ public class CrawlingThenConvertingIntegrationTest {
}
@Test
public void crawlThenProcess() {
public void crawlThenProcess() throws IOException {
var specs = CrawlSpecRecord.builder()
.domain("www.marginalia.nu")
.crawlDepth(10)
@ -72,10 +74,12 @@ public class CrawlingThenConvertingIntegrationTest {
}
private CrawledDomain crawl(CrawlSpecRecord specs) {
private CrawledDomain crawl(CrawlSpecRecord specs) throws IOException {
List<SerializableCrawlData> data = new ArrayList<>();
new CrawlerRetreiver(httpFetcher, specs, data::add).fetch();
try (var recorder = new WarcRecorder()) {
new CrawlerRetreiver(httpFetcher, specs, recorder, data::add).fetch();
}
CrawledDomain domain = data.stream().filter(CrawledDomain.class::isInstance).map(CrawledDomain.class::cast).findFirst().get();
data.stream().filter(CrawledDocument.class::isInstance).map(CrawledDocument.class::cast).forEach(domain.doc::add);

View File

@ -12,6 +12,7 @@ import nu.marginalia.atags.source.AnchorTagsSource;
import nu.marginalia.atags.source.AnchorTagsSourceFactory;
import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.spec.CrawlSpecProvider;
import nu.marginalia.crawl.spec.DbCrawlSpecProvider;
import nu.marginalia.crawl.spec.ParquetCrawlSpecProvider;
@ -212,21 +213,23 @@ public class CrawlerMain {
HttpFetcher fetcher = new HttpFetcherImpl(userAgent.uaString(), dispatcher, connectionPool);
try (CrawledDomainWriter writer = new CrawledDomainWriter(outputDir, domain, id);
var warcRecorder = new WarcRecorder(); // write to a temp file for now
var retreiver = new CrawlerRetreiver(fetcher, specification, warcRecorder, writer::accept);
CrawlDataReference reference = getReference())
{
Thread.currentThread().setName("crawling:" + domain);
var domainLinks = anchorTagsSource.getAnchorTags(domain);
var retreiver = new CrawlerRetreiver(fetcher, specification, writer::accept);
int size = retreiver.fetch(domainLinks, reference);
workLog.setJobToFinished(domain, writer.getOutputFile().toString(), size);
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
logger.info("Fetched {}", domain);
} catch (Exception e) {
logger.error("Error fetching domain " + domain, e);
}

View File

@ -8,6 +8,7 @@ import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.crawl.retreival.fetcher.ContentTags;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.crawl.retreival.fetcher.SitemapRetriever;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.crawling.model.*;
import nu.marginalia.ip_blocklist.UrlBlocklist;
@ -20,13 +21,15 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.file.Path;
import java.time.LocalDateTime;
import java.util.*;
import java.util.function.Consumer;
public class CrawlerRetreiver {
public class CrawlerRetreiver implements AutoCloseable {
private static final int MAX_ERRORS = 20;
@ -45,6 +48,7 @@ public class CrawlerRetreiver {
private static final DomainProber domainProber = new DomainProber();
private final SitemapRetriever sitemapRetriever;
private final DomainCrawlFrontier crawlFrontier;
private final WarcRecorder warcRecorder;
int errorCount = 0;
@ -56,7 +60,10 @@ public class CrawlerRetreiver {
public CrawlerRetreiver(HttpFetcher fetcher,
CrawlSpecRecord specs,
Consumer<SerializableCrawlData> writer) {
WarcRecorder warcRecorder,
Consumer<SerializableCrawlData> writer)
{
this.warcRecorder = warcRecorder;
this.fetcher = fetcher;
domain = specs.domain;
@ -121,7 +128,7 @@ public class CrawlerRetreiver {
assert !crawlFrontier.isEmpty();
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain);
final SimpleRobotRules robotsRules = fetcher.fetchRobotRules(crawlFrontier.peek().domain, warcRecorder);
final CrawlDelayTimer delayTimer = new CrawlDelayTimer(robotsRules.getCrawlDelay());
sniffRootDocument(delayTimer, rootUrl);
@ -419,7 +426,7 @@ public class CrawlerRetreiver {
private CrawledDocument tryDownload(EdgeUrl top, CrawlDelayTimer timer, ContentTags tags) {
for (int i = 0; i < 2; i++) {
try {
var doc = fetcher.fetchContent(top, tags);
var doc = fetcher.fetchContent(top, warcRecorder, tags);
doc.recrawlState = "NEW";
return doc;
}
@ -496,6 +503,11 @@ public class CrawlerRetreiver {
.build();
}
@Override
public void close() throws Exception {
warcRecorder.close();
}
private record DocumentWithReference(
@Nullable CrawledDocument doc,
@Nullable CrawlDataReference reference) {

View File

@ -0,0 +1,86 @@
package nu.marginalia.crawl.retreival.fetcher;
import nu.marginalia.crawl.retreival.logic.ContentTypeLogic;
import nu.marginalia.model.EdgeUrl;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.SocketTimeoutException;
import java.util.Objects;
public class ContentTypeProber {
private static final Logger logger = LoggerFactory.getLogger(ContentTypeProber.class);
private final String userAgent;
private final OkHttpClient client;
private final ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
public ContentTypeProber(String userAgent, OkHttpClient httpClient) {
this.userAgent = userAgent;
this.client = httpClient;
}
/** Probe the content type of the given URL with a HEAD request.
* This is used to detect binary files, which we don't want to crawl.
* <p>
* If the URL redirects, the final URL is returned, to avoid redundant
* requests.
*
* @param url The URL to probe
* @return A ContentTypeProbeResult
*/
public ContentTypeProbeResult probeContentType(EdgeUrl url) {
logger.debug("Probing suspected binary {}", url);
var headBuilder = new Request.Builder().head()
.addHeader("User-agent", userAgent)
.addHeader("Accept-Encoding", "gzip")
.url(url.toString());
var head = headBuilder.build();
var call = client.newCall(head);
try (var rsp = call.execute()) {
var contentTypeHeader = rsp.header("Content-type");
if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) {
return new ContentTypeProbeResult.BadContentType(contentTypeHeader, rsp.code());
}
// Update the URL to the final URL of the HEAD request, otherwise we might end up doing
// HEAD 301 url1 -> url2
// HEAD 200 url2
// GET 301 url1 -> url2
// GET 200 url2
// which is not what we want. Overall we want to do as few requests as possible to not raise
// too many eyebrows when looking at the logs on the target server. Overall it's probably desirable
// that it looks like the traffic makes sense, as opposed to looking like a broken bot.
var redirectUrl = new EdgeUrl(rsp.request().url().toString());
EdgeUrl ret;
if (Objects.equals(redirectUrl.domain, url.domain)) ret = redirectUrl;
else ret = url;
return new ContentTypeProbeResult.Ok(ret);
} catch (SocketTimeoutException ex) {
return new ContentTypeProbeResult.Timeout();
} catch (Exception ex) {
logger.error("Error during fetching {}[{}]", ex.getClass().getSimpleName(), ex.getMessage());
return new ContentTypeProbeResult.Exception(ex);
}
}
public sealed interface ContentTypeProbeResult {
record Ok(EdgeUrl resolvedUrl) implements ContentTypeProbeResult { }
record BadContentType(String contentType, int statusCode) implements ContentTypeProbeResult { }
record Timeout() implements ContentTypeProbeResult { }
record Exception(java.lang.Exception ex) implements ContentTypeProbeResult { }
}
}

View File

@ -0,0 +1,75 @@
package nu.marginalia.crawl.retreival.fetcher;
import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import nu.marginalia.model.EdgeUrl;
import java.time.LocalDateTime;
import java.util.Objects;
public class CrawledDocumentFactory {
public static CrawledDocument createHardErrorRsp(EdgeUrl url, Exception why) {
return CrawledDocument.builder()
.crawlerStatus(CrawlerDocumentStatus.ERROR.toString())
.crawlerStatusDesc(why.getClass().getSimpleName() + ": " + why.getMessage())
.timestamp(LocalDateTime.now().toString())
.url(url.toString())
.build();
}
public static CrawledDocument createUnknownHostError(EdgeUrl url) {
return CrawledDocument.builder()
.crawlerStatus(CrawlerDocumentStatus.ERROR.toString())
.crawlerStatusDesc("Unknown Host")
.timestamp(LocalDateTime.now().toString())
.url(url.toString())
.build();
}
public static CrawledDocument createTimeoutErrorRsp(EdgeUrl url) {
return CrawledDocument.builder()
.crawlerStatus("Timeout")
.timestamp(LocalDateTime.now().toString())
.url(url.toString())
.build();
}
public static CrawledDocument createErrorResponse(EdgeUrl url, HttpFetchResult.ResultOk rsp, CrawlerDocumentStatus status, String why) {
return CrawledDocument.builder()
.crawlerStatus(status.toString())
.crawlerStatusDesc(why)
.headers(rsp.headers().toString())
.contentType(Objects.requireNonNullElse(rsp.headers().get("Content-Type"), ""))
.timestamp(LocalDateTime.now().toString())
.httpStatus(rsp.statusCode())
.url(url.toString())
.build();
}
public static CrawledDocument createErrorResponse(EdgeUrl url, String contentType, int statusCode, CrawlerDocumentStatus status, String why) {
return CrawledDocument.builder()
.crawlerStatus(status.toString())
.crawlerStatusDesc(why)
.headers("")
.contentType(contentType)
.timestamp(LocalDateTime.now().toString())
.httpStatus(statusCode)
.url(url.toString())
.build();
}
public static CrawledDocument createRedirectResponse(EdgeUrl url, HttpFetchResult.ResultOk rsp, EdgeUrl responseUrl) {
return CrawledDocument.builder()
.crawlerStatus(CrawlerDocumentStatus.REDIRECT.name())
.redirectUrl(responseUrl.toString())
.headers(rsp.headers().toString())
.contentType(Objects.requireNonNullElse(rsp.headers().get("Content-Type"), ""))
.timestamp(LocalDateTime.now().toString())
.httpStatus(rsp.statusCode())
.url(url.toString())
.build();
}
}

View File

@ -3,6 +3,7 @@ package nu.marginalia.crawl.retreival.fetcher;
import com.google.inject.ImplementedBy;
import crawlercommons.robots.SimpleRobotRules;
import nu.marginalia.crawl.retreival.RateLimitException;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
@ -18,9 +19,9 @@ public interface HttpFetcher {
FetchResult probeDomain(EdgeUrl url);
CrawledDocument fetchContent(EdgeUrl url, ContentTags tags) throws RateLimitException;
CrawledDocument fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags) throws RateLimitException;
SimpleRobotRules fetchRobotRules(EdgeDomain domain);
SimpleRobotRules fetchRobotRules(EdgeDomain domain, WarcRecorder recorder);
SitemapRetriever createSitemapRetriever();
}

View File

@ -8,7 +8,11 @@ import lombok.SneakyThrows;
import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.crawl.retreival.Cookies;
import nu.marginalia.crawl.retreival.RateLimitException;
import nu.marginalia.crawl.retreival.fetcher.ContentTypeProber.ContentTypeProbeResult;
import nu.marginalia.crawl.retreival.fetcher.socket.*;
import nu.marginalia.crawl.retreival.fetcher.warc.HttpFetchResult;
import static nu.marginalia.crawl.retreival.fetcher.CrawledDocumentFactory.*;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import nu.marginalia.model.EdgeDomain;
@ -32,16 +36,17 @@ import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPInputStream;
public class HttpFetcherImpl implements HttpFetcher {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final String userAgent;
private final int maxFetchSize = 1024*512;
private final Cookies cookies = new Cookies();
private static final SimpleRobotRulesParser robotsParser = new SimpleRobotRulesParser();
private final ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
private final ContentTypeProber contentTypeProber;
@Override
public void setAllowAllContentTypes(boolean allowAllContentTypes) {
@ -88,13 +93,22 @@ public class HttpFetcherImpl implements HttpFetcher {
public HttpFetcherImpl(@Named("user-agent") String userAgent, Dispatcher dispatcher, ConnectionPool connectionPool) {
this.client = createClient(dispatcher, connectionPool);
this.userAgent = userAgent;
this.contentTypeProber = new ContentTypeProber(userAgent, client);
}
public HttpFetcherImpl(@Named("user-agent") String userAgent) {
this.client = createClient(null, new ConnectionPool());
this.userAgent = userAgent;
this.contentTypeProber = new ContentTypeProber(userAgent, client);
}
/**
* Probe the domain to see if it is reachable, attempting to identify which schema to use,
* and if there are any redirects. This is done by one or more HEAD requests.
*
* @param url The URL to probe.
* @return The result of the probe, indicating the state and the URL.
*/
@Override
@SneakyThrows
public FetchResult probeDomain(EdgeUrl url) {
@ -127,6 +141,7 @@ public class HttpFetcherImpl implements HttpFetcher {
@Override
@SneakyThrows
public CrawledDocument fetchContent(EdgeUrl url,
WarcRecorder warcRecorder,
ContentTags contentTags)
throws RateLimitException
{
@ -135,149 +150,96 @@ public class HttpFetcherImpl implements HttpFetcher {
// looks like it might be something else, we perform a HEAD first to check the content type
if (contentTags.isEmpty() && contentTypeLogic.isUrlLikeBinary(url))
{
logger.debug("Probing suspected binary {}", url);
var headBuilder = new Request.Builder().head()
.addHeader("User-agent", userAgent)
.addHeader("Accept-Encoding", "gzip")
.url(url.toString());
var head = headBuilder.build();
var call = client.newCall(head);
try (var rsp = call.execute()) {
var contentTypeHeader = rsp.header("Content-type");
if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) {
return createErrorResponse(url, rsp, CrawlerDocumentStatus.BAD_CONTENT_TYPE, "Early probe failed");
}
// Update the URL to the final URL of the HEAD request, otherwise we might end up doing
// HEAD 301 url1 -> url2
// HEAD 200 url2
// GET 301 url1 -> url2
// GET 200 url2
// which is not what we want. Overall we want to do as few requests as possible to not raise
// too many eyebrows when looking at the logs on the target server. Overall it's probably desirable
// that it looks like the traffic makes sense, as opposed to looking like a broken bot.
var redirectUrl = new EdgeUrl(rsp.request().url().toString());
if (Objects.equals(redirectUrl.domain, url.domain))
ContentTypeProbeResult probeResult = contentTypeProber.probeContentType(url);
switch (probeResult) {
case ContentTypeProbeResult.Ok(EdgeUrl redirectUrl) -> {
url = redirectUrl;
}
catch (SocketTimeoutException ex) {
return createTimeoutErrorRsp(url, ex);
}
catch (Exception ex) {
logger.error("Error during fetching {}[{}]", ex.getClass().getSimpleName(), ex.getMessage());
return createHardErrorRsp(url, ex);
}
}
case ContentTypeProbeResult.BadContentType (String contentType, int statusCode) -> {
return createErrorResponse(url, contentType, statusCode,
CrawlerDocumentStatus.BAD_CONTENT_TYPE,
contentType
);
}
case ContentTypeProbeResult.Timeout timeout -> {
return createTimeoutErrorRsp(url);
}
case ContentTypeProbeResult.Exception ex -> {
return createErrorFromException(url, ex.ex());
}
};
}
var getBuilder = new Request.Builder().get();
getBuilder.addHeader("User-agent", userAgent)
.url(url.toString())
.addHeader("Accept-Encoding", "gzip");
getBuilder.url(url.toString())
.addHeader("Accept-Encoding", "gzip")
.addHeader("User-agent", userAgent);
contentTags.paint(getBuilder);
var get = getBuilder.build();
var call = client.newCall(get);
HttpFetchResult result = warcRecorder.fetch(client, getBuilder.build());
try (var rsp = call.execute()) {
return extractBody(url, rsp);
if (result instanceof HttpFetchResult.ResultError err) {
return createErrorFromException(url, err.ex());
}
catch (RateLimitException rle) {
throw rle;
else if (result instanceof HttpFetchResult.ResultOk ok) {
try {
return extractBody(url, ok);
}
catch (Exception ex) {
return createErrorFromException(url, ex);
}
}
catch (SocketTimeoutException ex) {
return createTimeoutErrorRsp(url, ex);
}
catch (UnknownHostException ex) {
return createUnknownHostError(url, ex);
}
catch (SocketException | ProtocolException | IllegalCharsetNameException | SSLException | EOFException ex) {
// This is a bit of a grab-bag of errors that crop up
// IllegalCharsetName is egg on our face,
// but SSLException and EOFException are probably the server's fault
return createHardErrorRsp(url, ex);
}
catch (Exception ex) {
logger.error("Error during fetching", ex);
return createHardErrorRsp(url, ex);
else {
throw new IllegalStateException("Unknown result type " + result.getClass());
}
}
private CrawledDocument createHardErrorRsp(EdgeUrl url, Exception why) {
return CrawledDocument.builder()
.crawlerStatus(CrawlerDocumentStatus.ERROR.toString())
.crawlerStatusDesc(why.getClass().getSimpleName() + ": " + why.getMessage())
.timestamp(LocalDateTime.now().toString())
.url(url.toString())
.build();
private CrawledDocument createErrorFromException(EdgeUrl url, Exception exception) throws RateLimitException {
return switch (exception) {
case RateLimitException rle -> throw rle;
case SocketTimeoutException ex -> createTimeoutErrorRsp(url);
case UnknownHostException ex -> createUnknownHostError(url);
case SocketException ex -> createHardErrorRsp(url, ex);
case ProtocolException ex -> createHardErrorRsp(url, ex);
case IllegalCharsetNameException ex -> createHardErrorRsp(url, ex);
case SSLException ex -> createHardErrorRsp(url, ex);
case EOFException ex -> createHardErrorRsp(url, ex);
default -> {
logger.error("Error during fetching", exception);
yield createHardErrorRsp(url, exception);
}
};
}
private CrawledDocument createUnknownHostError(EdgeUrl url, Exception why) {
return CrawledDocument.builder()
.crawlerStatus(CrawlerDocumentStatus.ERROR.toString())
.crawlerStatusDesc("Unknown Host")
.timestamp(LocalDateTime.now().toString())
.url(url.toString())
.build();
}
private CrawledDocument extractBody(EdgeUrl url, HttpFetchResult.ResultOk rsp) throws IOException, RateLimitException {
private CrawledDocument createTimeoutErrorRsp(EdgeUrl url, Exception why) {
return CrawledDocument.builder()
.crawlerStatus("Timeout")
.crawlerStatusDesc(why.getMessage())
.timestamp(LocalDateTime.now().toString())
.url(url.toString())
.build();
}
private CrawledDocument createErrorResponse(EdgeUrl url, Response rsp, CrawlerDocumentStatus status, String why) {
return CrawledDocument.builder()
.crawlerStatus(status.toString())
.crawlerStatusDesc(why)
.headers(rsp.headers().toString())
.contentType(rsp.header("Content-type"))
.timestamp(LocalDateTime.now().toString())
.httpStatus(rsp.code())
.url(url.toString())
.build();
}
var responseUrl = new EdgeUrl(rsp.uri());
private CrawledDocument extractBody(EdgeUrl url, Response rsp) throws IOException, URISyntaxException, RateLimitException {
var responseUrl = new EdgeUrl(rsp.request().url().toString());
if (!Objects.equals(responseUrl.domain, url.domain)) {
return createRedirectResponse(url, rsp, responseUrl);
}
if (rsp.code() == 429) {
throw new RateLimitException(rsp.header("Retry-After", "1000"));
if (rsp.statusCode() == 429) {
String retryAfter = Objects.requireNonNullElse(rsp.header("Retry-After"), "1000");
throw new RateLimitException(retryAfter);
}
var body = rsp.body();
if (null == body) {
return createErrorResponse(url, rsp, CrawlerDocumentStatus.ERROR, "No body");
}
var byteStream = rsp.getInputStream();
var byteStream = body.byteStream();
if ("gzip".equals(rsp.header("Content-encoding"))) {
if ("gzip".equals(rsp.header("Content-Encoding"))) {
byteStream = new GZIPInputStream(byteStream);
}
byteStream = new BOMInputStream(byteStream);
var contentTypeHeader = rsp.header("Content-type");
var contentTypeHeader = rsp.header("Content-Type");
if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) {
return createErrorResponse(url, rsp, CrawlerDocumentStatus.BAD_CONTENT_TYPE, "");
}
byte[] data = byteStream.readNBytes(maxFetchSize);
byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder
var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data);
if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) {
@ -288,7 +250,7 @@ public class HttpFetcherImpl implements HttpFetcher {
return createErrorResponse(url, rsp, CrawlerDocumentStatus.BAD_CHARSET, "");
}
if (!isXRobotsTagsPermitted(rsp.headers("X-Robots-Tag"), userAgent)) {
if (!isXRobotsTagsPermitted(rsp.allHeaders("X-Robots-Tag"), userAgent)) {
return CrawledDocument.builder()
.crawlerStatus(CrawlerDocumentStatus.ROBOTS_TXT.name())
.crawlerStatusDesc("X-Robots-Tag")
@ -301,15 +263,12 @@ public class HttpFetcherImpl implements HttpFetcher {
var strData = DocumentBodyToString.getStringData(contentType, data);
var canonical = rsp.header("rel=canonical", "");
return CrawledDocument.builder()
.crawlerStatus(CrawlerDocumentStatus.OK.name())
.headers(rsp.headers().toString())
.contentType(rsp.header("Content-type"))
.contentType(contentTypeHeader)
.timestamp(LocalDateTime.now().toString())
.canonicalUrl(canonical)
.httpStatus(rsp.code())
.httpStatus(rsp.statusCode())
.url(responseUrl.toString())
.documentBody(strData)
.build();
@ -362,24 +321,11 @@ public class HttpFetcherImpl implements HttpFetcher {
return isPermittedGeneral;
}
private CrawledDocument createRedirectResponse(EdgeUrl url, Response rsp, EdgeUrl responseUrl) {
return CrawledDocument.builder()
.crawlerStatus(CrawlerDocumentStatus.REDIRECT.name())
.redirectUrl(responseUrl.toString())
.headers(rsp.headers().toString())
.contentType(rsp.header("Content-type"))
.timestamp(LocalDateTime.now().toString())
.httpStatus(rsp.code())
.url(url.toString())
.build();
}
@Override
public SimpleRobotRules fetchRobotRules(EdgeDomain domain) {
return fetchRobotsForProto("https", domain)
.or(() -> fetchRobotsForProto("http", domain))
public SimpleRobotRules fetchRobotRules(EdgeDomain domain, WarcRecorder recorder) {
return fetchRobotsForProto("https", recorder, domain)
.or(() -> fetchRobotsForProto("http", recorder, domain))
.orElseGet(() -> new SimpleRobotRules(SimpleRobotRules.RobotRulesMode.ALLOW_ALL));
}
@ -388,10 +334,10 @@ public class HttpFetcherImpl implements HttpFetcher {
return new SitemapRetriever();
}
private Optional<SimpleRobotRules> fetchRobotsForProto(String proto, EdgeDomain domain) {
private Optional<SimpleRobotRules> fetchRobotsForProto(String proto, WarcRecorder recorder, EdgeDomain domain) {
try {
var url = new EdgeUrl(proto, domain, null, "/robots.txt", null);
return Optional.of(parseRobotsTxt(fetchContent(url, ContentTags.empty())));
return Optional.of(parseRobotsTxt(fetchContent(url, recorder, ContentTags.empty())));
}
catch (Exception ex) {
return Optional.empty();
@ -406,3 +352,4 @@ public class HttpFetcherImpl implements HttpFetcher {
}
}

View File

@ -14,11 +14,11 @@ public class IpInterceptingNetworkInterceptor implements Interceptor {
return chain.proceed(chain.request())
.newBuilder()
.addHeader("X-Remote-IP", IP)
.addHeader("X-Marginalia-Remote-IP", IP)
.build();
}
public static String getIpFromResponse(Response response) {
return response.header("X-Remote-IP");
return response.header("X-Marginalia-Remote-IP");
}
}

View File

@ -0,0 +1,31 @@
package nu.marginalia.crawl.retreival.fetcher.warc;
import okhttp3.Headers;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URI;
import java.util.List;
public sealed interface HttpFetchResult {
record ResultOk(URI uri,
int statusCode,
Headers headers,
byte[] bytesRaw,
int bytesStart,
int bytesLength
) implements HttpFetchResult {
public InputStream getInputStream() {
return new ByteArrayInputStream(bytesRaw, bytesStart, bytesLength);
}
public String header(String name) {
return headers.get(name);
}
public List<String> allHeaders(String name) {
return headers.values(name);
}
};
record ResultError(Exception ex) implements HttpFetchResult { };
}

View File

@ -20,7 +20,11 @@ class WarcDigestBuilder {
}
public void update(byte[] buffer, int n) {
digest.update(buffer, 0, n);
update(buffer, 0, n);
}
public void update(byte[] buffer, int s, int n) {
digest.update(buffer, s, n);
}
public WarcDigest build() {

View File

@ -106,7 +106,8 @@ public class WarcProtocolReconstructor {
response.headers().toMultimap().forEach((k, values) -> {
String headerCapitalized = capitalizeHeader(k);
if (headerCapitalized.startsWith("X"))
// Omit pseudoheaders injected by the crawler itself
if (headerCapitalized.startsWith("X-Marginalia"))
return;
for (var value : values) {

View File

@ -1,7 +1,6 @@
package nu.marginalia.crawl.retreival.fetcher.warc;
import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor;
import nu.marginalia.model.EdgeDomain;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.netpreserve.jwarc.*;
@ -13,14 +12,10 @@ import java.io.InputStream;
import java.net.InetAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.util.Optional;
/** Based on JWarc's fetch method, APL 2.0 license
* <p></p>
@ -28,21 +23,39 @@ import java.util.Optional;
* as best is possible given not all the data is available at the same time and needs to
* be reconstructed.
*/
public class WarcRecordingFetcherClient implements AutoCloseable {
public class WarcRecorder implements AutoCloseable {
private static final int MAX_TIME = 30_000;
private static final int MAX_SIZE = 1024 * 1024 * 10;
private final WarcWriter writer;
private final Path warcFile;
private static final Logger logger = LoggerFactory.getLogger(WarcRecorder.class);
private final EdgeDomain domain;
private static final Logger logger = LoggerFactory.getLogger(WarcRecordingFetcherClient.class);
private ThreadLocal<byte[]> bufferThreadLocal = ThreadLocal.withInitial(() -> new byte[MAX_SIZE]);
private boolean temporaryFile = false;
public WarcRecordingFetcherClient(Path warcFile, EdgeDomain domain) throws IOException {
this.writer = new WarcWriter(warcFile);
this.domain = domain;
/**
* Create a new WarcRecorder that will write to the given file
*
* @param warcFile The file to write to
*/
public WarcRecorder(Path warcFile) throws IOException {
this.warcFile = warcFile;
this.writer = new WarcWriter(this.warcFile);
}
public Optional<WarcResponse> fetch(OkHttpClient client, Request request) throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
/**
* Create a new WarcRecorder that will write to a temporary file
* and delete it when close() is called.
*/
public WarcRecorder() throws IOException {
this.warcFile = Files.createTempFile("warc", ".warc.gz");
this.writer = new WarcWriter(this.warcFile);
temporaryFile = true;
}
public HttpFetchResult fetch(OkHttpClient client, Request request) throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
URI uri = request.url().uri();
WarcDigestBuilder responseDigestBuilder = new WarcDigestBuilder();
@ -52,20 +65,15 @@ public class WarcRecordingFetcherClient implements AutoCloseable {
Instant date = Instant.now();
long startMillis = date.toEpochMilli();
Path tempFileName = Files.createTempFile(domain.toString(), ".data");
var call = client.newCall(request);
int totalLength = 0;
WarcTruncationReason truncationReason = null;
ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer();
try (FileChannel tempFile =
(FileChannel) Files.newByteChannel(tempFileName, StandardOpenOption.READ, StandardOpenOption.WRITE);
var response = call.execute()
) {
try (var response = call.execute()) {
var body = response.body();
InputStream inputStream;
@ -82,29 +90,27 @@ public class WarcRecordingFetcherClient implements AutoCloseable {
ip = IpInterceptingNetworkInterceptor.getIpFromResponse(response);
String responseHeaders = WarcProtocolReconstructor.getResponseHeader(response);
tempFile.write(ByteBuffer.wrap(responseHeaders.getBytes()));
responseDigestBuilder.update(responseHeaders);
responseDataBuffer.put(responseHeaders);
responseDataBuffer.updateDigest(responseDigestBuilder, 0, responseDataBuffer.length());
int dataStart = responseDataBuffer.pos();
while (inputStream != null) {
int remainingLength;
int remainingLength = responseDataBuffer.remaining();
if (remainingLength == 0)
break;
if (MAX_SIZE > 0 && MAX_SIZE - totalLength < buf.length) {
remainingLength = (MAX_SIZE - totalLength);
} else {
remainingLength = buf.length;
}
int startPos = responseDataBuffer.pos();
int n = inputStream.read(buf, 0, remainingLength);
int n = responseDataBuffer.readFrom(inputStream, remainingLength);
if (n < 0)
break;
responseDataBuffer.updateDigest(responseDigestBuilder, startPos, n);
responseDataBuffer.updateDigest(payloadDigestBuilder, startPos, n);
totalLength += n;
for (int i = 0; i < n; ) {
int written = tempFile.write(ByteBuffer.wrap(buf, i, n - i));
i += written;
}
responseDigestBuilder.update(buf, n);
payloadDigestBuilder.update(buf, n);
@ -118,11 +124,10 @@ public class WarcRecordingFetcherClient implements AutoCloseable {
}
}
tempFile.position(0);
WarcResponse.Builder responseBuilder = new WarcResponse.Builder(uri)
.blockDigest(responseDigestBuilder.build())
.date(date)
.body(MediaType.HTTP_RESPONSE, tempFile, tempFile.size());
.body(MediaType.HTTP_RESPONSE, responseDataBuffer.copyBytes());
if (ip != null) responseBuilder.ipAddress(InetAddress.getByName(ip));
@ -133,6 +138,8 @@ public class WarcRecordingFetcherClient implements AutoCloseable {
// Build and write the response
long pos = writer.position();
var warcResponse = responseBuilder.build();
warcResponse.http(); // force HTTP header to be parsed before body is consumed so that caller can use it
writer.write(warcResponse);
@ -154,20 +161,77 @@ public class WarcRecordingFetcherClient implements AutoCloseable {
warcRequest.http(); // force HTTP header to be parsed before body is consumed so that caller can use it
writer.write(warcRequest);
return Optional.of(warcResponse);
return new HttpFetchResult.ResultOk(uri,
response.code(),
response.headers(),
responseDataBuffer.data,
dataStart,
responseDataBuffer.length() - dataStart);
}
catch (Exception ex) {
logger.warn("Failed to fetch URL {}", uri, ex);
return Optional.empty();
return new HttpFetchResult.ResultError(ex);
}
finally {
Files.deleteIfExists(tempFileName);
}
private class ResponseDataBuffer {
private final byte[] data;
private int length = 0;
private int pos = 0;
public ResponseDataBuffer() {
data = bufferThreadLocal.get();
}
public int pos() {
return pos;
}
public int length() {
return length;
}
public void put(String s) {
byte[] bytes = s.getBytes();
put(bytes, 0, bytes.length);
}
private void put(byte[] bytes, int i, int n) {
System.arraycopy(bytes, i, data, pos, n);
pos += n;
length += n;
}
public int readFrom(InputStream inputStream, int remainingLength) throws IOException {
int n = inputStream.read(data, pos, remainingLength);
if (n > 0) {
pos += n;
length += n;
}
return n;
}
public int remaining() {
return MAX_SIZE - pos;
}
public void updateDigest(WarcDigestBuilder digestBuilder, int startPos, int n) {
digestBuilder.update(data, startPos, n);
}
public byte[] copyBytes() {
byte[] copy = new byte[length];
System.arraycopy(data, 0, copy, 0, length);
return copy;
}
}
public void close() {
try {
writer.close();
if (temporaryFile)
Files.deleteIfExists(warcFile);
} catch (IOException e) {
throw new RuntimeException(e);
}

View File

@ -0,0 +1,59 @@
package nu.marginalia.crawl.retreival.fetcher;
import nu.marginalia.crawl.retreival.fetcher.ContentTypeProber.ContentTypeProbeResult.BadContentType;
import nu.marginalia.crawl.retreival.fetcher.ContentTypeProber.ContentTypeProbeResult.Ok;
import nu.marginalia.model.EdgeUrl;
import okhttp3.ConnectionPool;
import okhttp3.Dispatcher;
import okhttp3.OkHttpClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.net.URISyntaxException;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.*;
class ContentTypeProberTest {
ContentTypeProber prober;
@BeforeEach
void setUp() {
OkHttpClient client = new OkHttpClient.Builder()
.dispatcher(new Dispatcher(Executors.newVirtualThreadPerTaskExecutor()))
.connectionPool(new ConnectionPool(0, 1, TimeUnit.NANOSECONDS))
.build();
prober = new ContentTypeProber("test.marginalia.nu", client);
}
@Test
void probeContentType() throws URISyntaxException {
assertEquals(
new Ok(new EdgeUrl("https://www.marginalia.nu/robots.txt")),
prober.probeContentType(new EdgeUrl("https://www.marginalia.nu/robots.txt")),
"robots.txt is expected to pass the probing test since it's text/plain"
);
assertEquals(
new BadContentType("image/png", 200),
prober.probeContentType(new EdgeUrl("https://www.marginalia.nu/sanic.png")),
"sanic.png is expected to pass the probing test since it's image/png"
);
assertEquals(
new Ok(new EdgeUrl("https://www.marginalia.nu/dev/null")),
prober.probeContentType(new EdgeUrl("https://www.marginalia.nu/dev/null")),
"Despite being a 404, we expect this to be passed as OK as it's NotMyJob(TM) to verify response codes"
);
assertEquals(
new Ok(new EdgeUrl("https://www.marginalia.nu/projects/edge/about.gmi/")),
prober.probeContentType(new EdgeUrl("https://www.marginalia.nu/projects/edge/about.gmi")),
"about.gmi is expected to give a redirect to about.gmi/ which is served as text/html"
);
}
}

View File

@ -1,8 +1,7 @@
package nu.marginalia.crawl.retreival.fetcher;
import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecordingFetcherClient;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import org.junit.jupiter.api.AfterEach;
@ -23,9 +22,9 @@ import java.util.zip.GZIPInputStream;
import static org.junit.jupiter.api.Assertions.assertEquals;
class WarcRecordingFetcherClientTest {
class WarcRecorderTest {
Path fileName;
WarcRecordingFetcherClient client;
WarcRecorder client;
OkHttpClient httpClient;
@BeforeEach
public void setUp() throws Exception {
@ -34,7 +33,7 @@ class WarcRecordingFetcherClientTest {
.build();
fileName = Files.createTempFile("test", ".warc.gz");
client = new WarcRecordingFetcherClient(fileName, new EdgeDomain("www.marginalia.nu"));
client = new WarcRecorder(fileName);
}
@AfterEach

View File

@ -4,11 +4,13 @@ import lombok.SneakyThrows;
import nu.marginalia.crawl.retreival.RateLimitException;
import nu.marginalia.crawl.retreival.fetcher.ContentTags;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawl.retreival.logic.ContentTypeLogic;
import nu.marginalia.model.EdgeUrl;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.net.URISyntaxException;
class HttpFetcherTest {
@ -28,16 +30,22 @@ class HttpFetcherTest {
}
@Test
void fetchUTF8() throws URISyntaxException, RateLimitException {
void fetchUTF8() throws URISyntaxException, RateLimitException, IOException {
var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler");
var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), ContentTags.empty());
System.out.println(str.contentType);
try (var recorder = new WarcRecorder()) {
var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu"), recorder, ContentTags.empty());
System.out.println(str.contentType);
}
}
@Test
void fetchText() throws URISyntaxException, RateLimitException {
void fetchText() throws URISyntaxException, RateLimitException, IOException {
var fetcher = new HttpFetcherImpl("nu.marginalia.edge-crawler");
var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), ContentTags.empty());
System.out.println(str);
try (var recorder = new WarcRecorder()) {
var str = fetcher.fetchContent(new EdgeUrl("https://www.marginalia.nu/robots.txt"), recorder, ContentTags.empty());
System.out.println(str.contentType);
}
}
}

View File

@ -4,6 +4,7 @@ import crawlercommons.robots.SimpleRobotRules;
import lombok.SneakyThrows;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.fetcher.*;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import nu.marginalia.crawling.model.SerializableCrawlData;
@ -17,11 +18,13 @@ import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
public class CrawlerMockFetcherTest {
@ -60,42 +63,46 @@ public class CrawlerMockFetcherTest {
}
void crawl(CrawlSpecRecord spec, Consumer<SerializableCrawlData> consumer) throws IOException {
try (var recorder = new WarcRecorder()) {
new CrawlerRetreiver(fetcherMock, spec, recorder, consumer)
.fetch();
}
}
@Test
public void testLemmy() throws URISyntaxException {
public void testLemmy() throws URISyntaxException, IOException {
List<SerializableCrawlData> out = new ArrayList<>();
registerUrlClasspathData(new EdgeUrl("https://startrek.website/"), "mock-crawl-data/lemmy/index.html");
registerUrlClasspathData(new EdgeUrl("https://startrek.website/c/startrek"), "mock-crawl-data/lemmy/c_startrek.html");
registerUrlClasspathData(new EdgeUrl("https://startrek.website/post/108995"), "mock-crawl-data/lemmy/108995.html");
new CrawlerRetreiver(fetcherMock, new CrawlSpecRecord("startrek.website", 10, new ArrayList<>()), out::add)
.fetch();
crawl(new CrawlSpecRecord("startrek.website", 10, new ArrayList<>()), out::add);
out.forEach(System.out::println);
}
@Test
public void testMediawiki() throws URISyntaxException {
public void testMediawiki() throws URISyntaxException, IOException {
List<SerializableCrawlData> out = new ArrayList<>();
registerUrlClasspathData(new EdgeUrl("https://en.wikipedia.org/"), "mock-crawl-data/mediawiki/index.html");
new CrawlerRetreiver(fetcherMock, new CrawlSpecRecord("en.wikipedia.org", 10, new ArrayList<>()), out::add)
.fetch();
crawl(new CrawlSpecRecord("en.wikipedia.org", 10, new ArrayList<>()), out::add);
out.forEach(System.out::println);
}
@Test
public void testDiscourse() throws URISyntaxException {
public void testDiscourse() throws URISyntaxException, IOException {
List<SerializableCrawlData> out = new ArrayList<>();
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/"), "mock-crawl-data/discourse/index.html");
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/telegram-channel-to-idle-on/3501"), "mock-crawl-data/discourse/telegram.html");
registerUrlClasspathData(new EdgeUrl("https://community.tt-rss.org/t/combined-mode-but-grid/4489"), "mock-crawl-data/discourse/grid.html");
new CrawlerRetreiver(fetcherMock, new CrawlSpecRecord("community.tt-rss.org", 100, new ArrayList<>()), out::add)
.fetch();
crawl(new CrawlSpecRecord("community.tt-rss.org", 10, new ArrayList<>()), out::add);
out.forEach(System.out::println);
}
@ -118,7 +125,7 @@ public class CrawlerMockFetcherTest {
}
@Override
public CrawledDocument fetchContent(EdgeUrl url, ContentTags tags) {
public CrawledDocument fetchContent(EdgeUrl url, WarcRecorder recorder, ContentTags tags) {
logger.info("Fetching {}", url);
if (mockData.containsKey(url)) {
return mockData.get(url);
@ -135,7 +142,7 @@ public class CrawlerMockFetcherTest {
}
@Override
public SimpleRobotRules fetchRobotRules(EdgeDomain domain) {
public SimpleRobotRules fetchRobotRules(EdgeDomain domain, WarcRecorder recorder) {
return new SimpleRobotRules();
}

View File

@ -7,6 +7,7 @@ import nu.marginalia.crawl.retreival.CrawlDataReference;
import nu.marginalia.crawl.retreival.CrawlerRetreiver;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.io.CrawledDomainReader;
import nu.marginalia.crawling.io.CrawledDomainWriter;
import nu.marginalia.crawling.model.CrawledDocument;
@ -14,16 +15,17 @@ import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import org.junit.jupiter.api.*;
import org.netpreserve.jwarc.WarcReader;
import org.netpreserve.jwarc.WarcRequest;
import org.netpreserve.jwarc.WarcResponse;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("slow")
@ -42,6 +44,53 @@ class CrawlerRetreiverTest {
System.setProperty("http.agent", WmsaHome.getUserAgent().uaString());
}
@Test
public void testWarcOutput() throws IOException {
var specs = CrawlSpecRecord
.builder()
.crawlDepth(5)
.domain("www.marginalia.nu")
.urls(List.of("https://www.marginalia.nu/misc/debian-laptop-install-log/"))
.build();
Path tempFile = null;
try {
tempFile = Files.createTempFile("crawling-process", "warc");
List<SerializableCrawlData> data = new ArrayList<>();
try (var recorder = new WarcRecorder(tempFile)) {
new CrawlerRetreiver(httpFetcher, specs, recorder, data::add).fetch();
} catch (IOException ex) {
Assertions.fail(ex);
}
Set<String> requests = new HashSet<>();
Set<String> responses = new HashSet<>();
try (var reader = new WarcReader(tempFile)) {
reader.forEach(record -> {
if (record instanceof WarcRequest req) {
requests.add(req.target());
System.out.println(req.type() + ":" + req.target());
}
else if (record instanceof WarcResponse rsp) {
responses.add(rsp.target());
System.out.println(rsp.type() + ":" + rsp.target());
}
else {
System.out.println(record.type());
}
});
}
assertTrue(requests.contains("https://www.marginalia.nu/misc/debian-laptop-install-log/"));
assertEquals(requests, responses);
}
finally {
if (tempFile != null)
Files.deleteIfExists(tempFile);
}
}
@Test
public void testWithKnownDomains() {
var specs = CrawlSpecRecord
@ -53,7 +102,12 @@ class CrawlerRetreiverTest {
List<SerializableCrawlData> data = new ArrayList<>();
new CrawlerRetreiver(httpFetcher, specs, data::add).fetch();
try (var recorder = new WarcRecorder()) {
new CrawlerRetreiver(httpFetcher, specs, recorder, data::add).fetch();
}
catch (IOException ex) {
Assertions.fail(ex);
}
var fetchedUrls =
data.stream().filter(CrawledDocument.class::isInstance)
@ -82,7 +136,12 @@ class CrawlerRetreiverTest {
List<SerializableCrawlData> data = new ArrayList<>();
new CrawlerRetreiver(httpFetcher, specs, data::add).fetch();
try (var recorder = new WarcRecorder()) {
new CrawlerRetreiver(httpFetcher, specs, recorder, data::add).fetch();
}
catch (IOException ex) {
Assertions.fail(ex);
}
data.stream().filter(CrawledDocument.class::isInstance)
.map(CrawledDocument.class::cast)
@ -118,16 +177,23 @@ class CrawlerRetreiverTest {
var writer = new CrawledDomainWriter(out, specs.domain, "idid");
Map<Class<? extends SerializableCrawlData>, List<SerializableCrawlData>> data = new HashMap<>();
new CrawlerRetreiver(httpFetcher, specs, d -> {
data.computeIfAbsent(d.getClass(), k->new ArrayList<>()).add(d);
if (d instanceof CrawledDocument doc) {
System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus);
if (Math.random() > 0.5) {
doc.headers = "";
try (var recorder = new WarcRecorder()) {
new CrawlerRetreiver(httpFetcher, specs, recorder, d -> {
data.computeIfAbsent(d.getClass(), k->new ArrayList<>()).add(d);
if (d instanceof CrawledDocument doc) {
System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus);
if (Math.random() > 0.5) {
doc.headers = "";
}
}
}
writer.accept(d);
}).fetch();
writer.accept(d);
}).fetch();
}
catch (IOException ex) {
Assertions.fail(ex);
}
writer.close();
var reader = new CrawledDomainReader();
@ -135,12 +201,15 @@ class CrawlerRetreiverTest {
CrawledDomain domain = (CrawledDomain) data.get(CrawledDomain.class).get(0);
domain.doc = data.get(CrawledDocument.class).stream().map(CrawledDocument.class::cast).collect(Collectors.toList());
new CrawlerRetreiver(httpFetcher, specs, d -> {
if (d instanceof CrawledDocument doc) {
System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus);
}
}).fetch(new DomainLinks(), new CrawlDataReference(stream));
try (var recorder = new WarcRecorder()) {
new CrawlerRetreiver(httpFetcher, specs, recorder, d -> {
if (d instanceof CrawledDocument doc) {
System.out.println(doc.url + ": " + doc.recrawlState + "\t" + doc.httpStatus);
}
}).fetch(new DomainLinks(), new CrawlDataReference(stream));
}
catch (IOException ex) {
Assertions.fail(ex);
}
}
}