(warc) Clean up parquet conversion

This commit further cleans up the warc->parquet conversion. It fixes issues with redirect handling in WarcRecorder, adds support information about redirects and errors due to probe failure.

It also refactors the fetch result, body extraction and content type abstractions.
This commit is contained in:
Viktor Lofgren 2023-12-14 20:39:40 +01:00
parent 1328bc4938
commit 0889b6d247
23 changed files with 403 additions and 133 deletions

View File

@ -21,6 +21,7 @@ dependencies {
testImplementation libs.bundles.slf4j.test
implementation libs.jsoup
implementation libs.commons.lang3
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit

View File

@ -1,9 +1,28 @@
package nu.marginalia.contenttype;
import org.apache.commons.lang3.StringUtils;
/** Content type and charset of a document
* @param contentType The content type, e.g. "text/html"
* @param charset The charset, e.g. "UTF-8"
*/
public record ContentType(String contentType, String charset) {
public static ContentType parse(String contentTypeHeader) {
String[] parts = StringUtils.split(contentTypeHeader, ";", 2);
String contentType = parts[0].trim();
String charset = parts.length > 1 ? parts[1].trim() : "UTF-8";
return new ContentType(contentType, charset);
}
public boolean is(String contentType) {
return this.contentType.equalsIgnoreCase(contentType);
}
public String toString() {
if (charset == null || charset.isBlank())
return contentType;
return STR."\{contentType}; charset=\{charset}";
}
}

View File

@ -37,7 +37,9 @@ public class GeoIpDictionary {
throw new RuntimeException(e);
}
finally {
this.notifyAll();
synchronized (this) {
this.notifyAll();
}
}
});
}

View File

@ -1,5 +1,6 @@
package nu.marginalia.crawling.body;
import nu.marginalia.contenttype.ContentType;
import nu.marginalia.model.EdgeUrl;
import java.util.List;
@ -37,6 +38,9 @@ public class ContentTypeLogic {
return probableBinaryPattern.test(pathLowerCase);
}
public boolean isAllowableContentType(ContentType contentType) {
return isAllowableContentType(contentType.contentType());
}
public boolean isAllowableContentType(String contentType) {
if (allowAllContentTypes)
return true;

View File

@ -1,5 +1,6 @@
package nu.marginalia.crawling.body;
import nu.marginalia.contenttype.ContentType;
import nu.marginalia.contenttype.ContentTypeParser;
import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
@ -7,7 +8,6 @@ import org.apache.commons.io.input.BOMInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.zip.GZIPInputStream;
public class DocumentBodyExtractor {
@ -15,28 +15,38 @@ public class DocumentBodyExtractor {
private static final Logger logger = LoggerFactory.getLogger(DocumentBodyExtractor.class);
public static DocumentBodyResult<String> asString(HttpFetchResult result) {
if (result instanceof HttpFetchResult.ResultOk ok) {
return asString(ok);
}
else if (result instanceof HttpFetchResult.ResultRetained retained) {
return new DocumentBodyResult.Ok<>(retained.contentType(), retained.body());
}
return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.ERROR, "Fetch Result Not Ok");
}
/** Extract the body from a fetch result as a byte array. */
public static DocumentBodyResult<byte[]> asBytes(HttpFetchResult result) {
if (result instanceof HttpFetchResult.ResultOk fetchOk) {
return asBytes(fetchOk);
}
else if (result instanceof HttpFetchResult.ResultRetained retained) {
else if (result instanceof HttpFetchResult.Result304ReplacedWithReference retained) {
return new DocumentBodyResult.Ok<>(retained.contentType(), retained.body().getBytes());
}
return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.ERROR, "Fetch Result Not Ok");
}
/** Extract the body from a fetch result as a string. This function performs
* content-type checks to ensure that the content-type is such that this operation
* makes sense.
*
* @see ContentTypeLogic#isAllowableContentType(String)
* */
public static DocumentBodyResult<String> asString(HttpFetchResult result) {
return asBytes(result).flatMap(DocumentBodyExtractor::toStringResult);
}
private static DocumentBodyResult<String> toStringResult(ContentType contentType, byte[] bytes) {
if (contentTypeLogic.isAllowableContentType(contentType)) {
return new DocumentBodyResult.Ok<>(contentType, DocumentBodyToString.getStringData(contentType, bytes));
}
else {
return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.BAD_CONTENT_TYPE, "");
}
}
/** Extract the body from a fetch result as a byte array. */
public static DocumentBodyResult<byte[]> asBytes(HttpFetchResult.ResultOk rsp) {
try {
var byteStream = rsp.getInputStream();
@ -51,44 +61,11 @@ public class DocumentBodyExtractor {
byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder
var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data);
return new DocumentBodyResult.Ok<>(contentType.contentType(), data);
return new DocumentBodyResult.Ok<>(contentType, data);
} catch (Exception ex) {
logger.error("Failed to extract body", ex);
return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.ERROR, "");
}
}
public static DocumentBodyResult<String> asString(HttpFetchResult.ResultOk rsp) {
try {
var byteStream = rsp.getInputStream();
if ("gzip".equals(rsp.header("Content-Encoding"))) {
byteStream = new GZIPInputStream(byteStream);
}
byteStream = new BOMInputStream(byteStream);
var contentTypeHeader = rsp.header("Content-Type");
if (contentTypeHeader != null && !contentTypeLogic.isAllowableContentType(contentTypeHeader)) {
return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.BAD_CONTENT_TYPE, "");
}
byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder
var contentType = ContentTypeParser.parseContentType(contentTypeHeader, data);
if (!contentTypeLogic.isAllowableContentType(contentType.contentType())) {
return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.BAD_CONTENT_TYPE, "");
}
if ("Shift_JIS".equalsIgnoreCase(contentType.charset())) {
return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.BAD_CHARSET, "");
}
return new DocumentBodyResult.Ok<>(contentType.contentType(), DocumentBodyToString.getStringData(contentType, data));
}
catch (IOException ex) {
logger.error("Failed to extract body", ex);
return new DocumentBodyResult.Error<>(CrawlerDocumentStatus.ERROR, "");
}
}
}

View File

@ -1,17 +1,27 @@
package nu.marginalia.crawling.body;
import nu.marginalia.contenttype.ContentType;
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import java.util.Optional;
import java.util.function.BiFunction;
public sealed interface DocumentBodyResult<T> {
record Ok<T>(String contentType, T body) implements DocumentBodyResult<T> {
record Ok<T>(ContentType contentType, T body) implements DocumentBodyResult<T> {
@Override
public <T2> Optional<T2> mapOpt(BiFunction<String, T, T2> mapper) {
public <T2> Optional<T2> mapOpt(BiFunction<ContentType, T, T2> mapper) {
return Optional.of(mapper.apply(contentType, body));
}
@Override
public <T2> Optional<T2> flatMapOpt(BiFunction<ContentType, T, Optional<T2>> mapper) {
return mapper.apply(contentType, body);
}
@Override
public <T2> DocumentBodyResult<T2> flatMap(BiFunction<ContentType, T, DocumentBodyResult<T2>> mapper) {
return mapper.apply(contentType, body);
}
@Override
public void ifPresent(ExConsumer<T, Exception> consumer) throws Exception {
@ -20,20 +30,29 @@ public sealed interface DocumentBodyResult<T> {
}
record Error<T>(CrawlerDocumentStatus status, String why) implements DocumentBodyResult<T> {
@Override
public <T2> Optional<T2> mapOpt(BiFunction<String, T, T2> mapper) {
public <T2> Optional<T2> mapOpt(BiFunction<ContentType, T, T2> mapper) {
return Optional.empty();
}
public <T2> Optional<T2> flatMapOpt(BiFunction<ContentType, T, Optional<T2>> mapper) { return Optional.empty(); }
@Override
@SuppressWarnings("unchecked")
public <T2> DocumentBodyResult<T2> flatMap(BiFunction<ContentType, T, DocumentBodyResult<T2>> mapper) {
return (DocumentBodyResult<T2>) this;
}
@Override
public void ifPresent(ExConsumer<T, Exception> consumer) throws Exception {
}
}
<T2> Optional<T2> mapOpt(BiFunction<String, T, T2> mapper);
<T2> Optional<T2> mapOpt(BiFunction<ContentType, T, T2> mapper);
<T2> Optional<T2> flatMapOpt(BiFunction<ContentType, T, Optional<T2>> mapper);
<T2> DocumentBodyResult<T2> flatMap(BiFunction<ContentType, T, DocumentBodyResult<T2>> mapper);
void ifPresent(ExConsumer<T,Exception> consumer) throws Exception;
interface ExConsumer<T,E extends Exception> {
void accept(String contentType, T t) throws E;
void accept(ContentType contentType, T t) throws E;
}
}

View File

@ -1,5 +1,6 @@
package nu.marginalia.crawling.body;
import nu.marginalia.contenttype.ContentType;
import okhttp3.Headers;
import org.jsoup.Jsoup;
import org.netpreserve.jwarc.MessageHeaders;
@ -11,13 +12,15 @@ import java.io.IOException;
import java.io.InputStream;
import java.net.InetAddress;
import java.net.URI;
import java.util.List;
import java.util.Optional;
/* FIXME: This interface has a very unfortunate name that is not very descriptive.
*/
public sealed interface HttpFetchResult {
boolean isOk();
/** Convert a WarcResponse to a HttpFetchResult */
static HttpFetchResult importWarc(WarcResponse response) {
try {
var http = response.http();
@ -47,6 +50,10 @@ public sealed interface HttpFetchResult {
}
/** Corresponds to a successful retrieval of a document
* from the remote server. Note that byte[] is only borrowed
* and subsequent calls may overwrite the contents of this buffer.
*/
record ResultOk(URI uri,
int statusCode,
Headers headers,
@ -85,23 +92,29 @@ public sealed interface HttpFetchResult {
}
public Optional<Document> parseDocument() throws IOException {
return switch(DocumentBodyExtractor.asString(this)) {
case DocumentBodyResult.Ok<String> ok when "text/html".equalsIgnoreCase(ok.contentType())
-> Optional.of(Jsoup.parse(ok.body()));
default -> Optional.empty();
};
return DocumentBodyExtractor.asString(this).flatMapOpt((contentType, body) -> {
if (contentType.is("text/html")) {
return Optional.of(Jsoup.parse(body));
}
else {
return Optional.empty();
}
});
}
public String header(String name) {
return headers.get(name);
}
public List<String> allHeaders(String name) {
return headers.values(name);
}
};
record ResultRetained(String url, String contentType, String body) implements HttpFetchResult {
/** This is a special case where the document was not fetched
* because it was already in the database. In this case, we
* replace the original data.
*
* @see Result304Raw for the case where the document has not yet been replaced with the reference data.
*/
record Result304ReplacedWithReference(String url, ContentType contentType, String body) implements HttpFetchResult {
public boolean isOk() {
return true;
@ -116,16 +129,29 @@ public sealed interface HttpFetchResult {
}
}
};
/** Fetching resulted in an exception */
record ResultException(Exception ex) implements HttpFetchResult {
public boolean isOk() {
return false;
}
};
record ResultSame() implements HttpFetchResult {
/** Fetching resulted in a HTTP 304, the remote content is identical to
* our reference copy. This will be replaced with a Result304ReplacedWithReference
* at a later stage.
*
* @see Result304ReplacedWithReference
*/
record Result304Raw() implements HttpFetchResult {
public boolean isOk() {
return false;
}
};
/** No result. This is typically injected at a later stage
* of processing, e.g. after filtering out irrelevant responses.
*/
record ResultNone() implements HttpFetchResult {
public boolean isOk() {
return false;

View File

@ -1,8 +1,9 @@
package nu.marginalia.crawling.io;
import com.google.gson.Gson;
import nu.marginalia.crawling.io.format.LegacyFileReadingSerializableCrawlDataStream;
import nu.marginalia.crawling.io.format.WarcReadingSerializableCrawlDataStream;
import nu.marginalia.crawling.io.format.LegacySerializableCrawlDataStream;
import nu.marginalia.crawling.io.format.ParquetSerializableCrawlDataStream;
import nu.marginalia.crawling.io.format.WarcSerializableCrawlDataStream;
import nu.marginalia.model.gson.GsonFactory;
import java.io.*;
@ -19,10 +20,13 @@ public class CrawledDomainReader {
public static SerializableCrawlDataStream createDataStream(Path fullPath) throws IOException {
String fileName = fullPath.getFileName().toString();
if (fileName.endsWith(".zstd")) {
return new LegacyFileReadingSerializableCrawlDataStream(gson, fullPath.toFile());
return new LegacySerializableCrawlDataStream(gson, fullPath.toFile());
}
else if (fileName.endsWith(".warc") || fileName.endsWith(".warc.gz")) {
return new WarcReadingSerializableCrawlDataStream(fullPath);
return new WarcSerializableCrawlDataStream(fullPath);
}
else if (fileName.endsWith(".parquet")) {
return new ParquetSerializableCrawlDataStream(fullPath);
}
else {
throw new IllegalArgumentException("Unknown file type: " + fullPath);
@ -31,8 +35,12 @@ public class CrawledDomainReader {
/** An iterator-like access to domain data. This must be closed otherwise it will leak off-heap memory! */
public static SerializableCrawlDataStream createDataStream(Path basePath, String domain, String id) throws IOException {
Path parquetPath = CrawlerOutputFile.getParquetPath(basePath, id, domain);
Path warcPath = CrawlerOutputFile.getWarcPath(basePath, id, domain, CrawlerOutputFile.WarcFileVersion.FINAL);
if (Files.exists(parquetPath)) {
return createDataStream(parquetPath);
}
if (Files.exists(warcPath)) {
return createDataStream(warcPath);
}

View File

@ -74,6 +74,15 @@ public class CrawlerOutputFile {
}
return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}.parquet");
}
public static Path getParquetPath(Path basePath, String id, String domain) {
id = padId(id);
String first = id.substring(0, 2);
String second = id.substring(2, 4);
Path destDir = basePath.resolve(first).resolve(second);
return destDir.resolve(STR."\{id}-\{filesystemSafeName(domain)}.parquet");
}
public static Path getWarcPath(Path basePath, String id, String domain, WarcFileVersion version) {
id = padId(id);

View File

@ -11,13 +11,16 @@ import nu.marginalia.crawling.model.SerializableCrawlData;
import java.io.*;
import java.nio.file.Path;
public class LegacyFileReadingSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
/** This class is used to read the old format of crawl data, which was zstd-compressed JSON
* with type delimiters between records.
*/
public class LegacySerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
private final Gson gson;
private final BufferedReader bufferedReader;
private SerializableCrawlData next = null;
private final Path path;
public LegacyFileReadingSerializableCrawlDataStream(Gson gson, File file) throws IOException {
public LegacySerializableCrawlDataStream(Gson gson, File file) throws IOException {
this.gson = gson;
bufferedReader = new BufferedReader(new InputStreamReader(new ZstdInputStream(new FileInputStream(file), RecyclingBufferPool.INSTANCE)));
path = file.toPath();

View File

@ -0,0 +1,118 @@
package nu.marginalia.crawling.io.format;
import lombok.SneakyThrows;
import nu.marginalia.contenttype.ContentType;
import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.CrawlerDomainStatus;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecord;
import nu.marginalia.crawling.parquet.CrawledDocumentParquetRecordFileReader;
import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.*;
public class ParquetSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
private static final Logger logger = LoggerFactory.getLogger(ParquetSerializableCrawlDataStream.class);
private final Iterator<CrawledDocumentParquetRecord> backingIterator;
private Deque<SerializableCrawlData> nextQ = new ArrayDeque<>();
private boolean wroteDomainRecord = false;
private final Path path;
public ParquetSerializableCrawlDataStream(Path file) throws IOException {
path = file;
backingIterator = CrawledDocumentParquetRecordFileReader.stream(file).iterator();
}
@Override
public Path path() {
return path;
}
@Override
@SneakyThrows
public boolean hasNext() {
while (backingIterator.hasNext() && nextQ.isEmpty()) {
var nextRecord = backingIterator.next();
if (!wroteDomainRecord) {
createDomainRecord(nextRecord);
wroteDomainRecord = true;
}
createDocumentRecord(nextRecord);
}
return !nextQ.isEmpty();
}
private void createDomainRecord(CrawledDocumentParquetRecord parquetRecord) throws URISyntaxException {
CrawlerDomainStatus status = CrawlerDomainStatus.OK;
String statusReason = "";
String redirectDomain = null;
if (parquetRecord.contentType.equals("x-marginalia/advisory;state=redir")) {
EdgeUrl crawledUrl = new EdgeUrl(parquetRecord.url);
redirectDomain = crawledUrl.getDomain().toString();
status = CrawlerDomainStatus.REDIRECT;
}
else if (parquetRecord.contentType.equals("x-marginalia/advisory;state=blocked")) {
status = CrawlerDomainStatus.BLOCKED; // FIXME we don't write this yet
}
else if (parquetRecord.contentType.equals("x-marginalia/advisory;state=error")) {
status = CrawlerDomainStatus.ERROR;
statusReason = new String(parquetRecord.body);
}
// FIXME -- cookies
nextQ.add(new CrawledDomain(
parquetRecord.domain,
redirectDomain,
status.toString(),
statusReason,
parquetRecord.ip,
new ArrayList<>(),
new ArrayList<>()
));
}
private void createDocumentRecord(CrawledDocumentParquetRecord nextRecord) {
String bodyString = DocumentBodyToString.getStringData(
ContentType.parse(nextRecord.contentType),
nextRecord.body);
// FIXME -- a lot of these fields are not set properly!
nextQ.add(new CrawledDocument("",
nextRecord.url,
nextRecord.contentType,
"",
nextRecord.httpStatus,
"OK",
"",
"",
bodyString,
"",
nextRecord.url,
null,
""));
}
public void close() throws IOException {
}
@Override
public SerializableCrawlData next() throws IOException {
if (!hasNext())
throw new NoSuchElementException();
return nextQ.poll();
}
}

View File

@ -14,20 +14,17 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.StringJoiner;
import java.util.*;
public class WarcReadingSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
private static final Logger logger = LoggerFactory.getLogger(WarcReadingSerializableCrawlDataStream.class);
public class WarcSerializableCrawlDataStream implements AutoCloseable, SerializableCrawlDataStream {
private static final Logger logger = LoggerFactory.getLogger(WarcSerializableCrawlDataStream.class);
private final WarcReader reader;
private final Iterator<WarcRecord> backingIterator;
private SerializableCrawlData next = null;
private final Path path;
public WarcReadingSerializableCrawlDataStream(Path file) throws IOException {
public WarcSerializableCrawlDataStream(Path file) throws IOException {
path = file;
reader = new WarcReader(file);
WarcXResponseReference.register(reader);
@ -51,17 +48,10 @@ public class WarcReadingSerializableCrawlDataStream implements AutoCloseable, Se
else if (nextRecord instanceof Warcinfo warcinfo) {
convertWarcinfo(warcinfo);
}
else if (nextRecord instanceof WarcMetadata metadata) {
convertMetadata(metadata);
}
}
return next != null;
}
private void convertMetadata(WarcMetadata metadata) {
// Nothing to do here for now
}
private void convertWarcinfo(Warcinfo warcinfo) throws IOException {
var headers = warcinfo.fields();
String probeStatus = headers.first("X-WARC-Probe-Status").orElse("");
@ -79,7 +69,10 @@ public class WarcReadingSerializableCrawlDataStream implements AutoCloseable, Se
}
// TODO: Fix cookies info somehow
next = new CrawledDomain(domain, redirectDomain, status, statusReason, ip, List.of(), List.of());
next = new CrawledDomain(domain, redirectDomain, status, statusReason, ip,
new ArrayList<>(),
new ArrayList<>()
);
}
private void convertResponse(WarcResponse response) throws IOException {
@ -109,7 +102,7 @@ public class WarcReadingSerializableCrawlDataStream implements AutoCloseable, Se
next = new CrawledDocument(
"",
response.targetURI().toString(),
ok.contentType(),
ok.contentType().toString(),
response.date().toString(),
http.status(),
"OK",

View File

@ -4,10 +4,7 @@ import blue.strategic.parquet.ParquetWriter;
import nu.marginalia.crawling.body.DocumentBodyExtractor;
import nu.marginalia.crawling.body.DocumentBodyResult;
import nu.marginalia.crawling.body.HttpFetchResult;
import org.netpreserve.jwarc.WarcReader;
import org.netpreserve.jwarc.WarcRecord;
import org.netpreserve.jwarc.WarcResponse;
import org.netpreserve.jwarc.WarcXResponseReference;
import org.netpreserve.jwarc.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -25,7 +22,16 @@ public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable {
WarcXResponseReference.register(warcReader);
for (var record : warcReader) {
parquetWriter.write(domain, record);
if (record instanceof WarcResponse response) {
parquetWriter.write(domain, response);
}
else if (record instanceof Warcinfo warcinfo) {
parquetWriter.write(domain, warcinfo);
}
else {
logger.warn("Skipping record of type {}", record.type());
}
}
}
catch (Exception ex) {
@ -33,6 +39,34 @@ public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable {
}
}
private void write(String domain, Warcinfo warcinfo) throws IOException {
String selfDomain = warcinfo.fields().first("domain").orElse("");
String ip = warcinfo.fields().first("ip").orElse("");
String probeStatus = warcinfo.fields().first("X-WARC-Probe-Status").orElse("");
if (probeStatus.startsWith("REDIRECT")) {
String redirectDomain = probeStatus.substring("REDIRECT;".length());
write(new CrawledDocumentParquetRecord(selfDomain,
STR."https://\{redirectDomain}/",
ip,
false,
0,
"x-marginalia/advisory;state=redirect",
new byte[0]
));
}
else if (!"OK".equals(probeStatus)) {
write(new CrawledDocumentParquetRecord(selfDomain,
STR."https://\{domain}/",
ip,
false,
0,
"x-marginalia/advisory;state=error",
probeStatus.getBytes()
));
}
}
public CrawledDocumentParquetRecordFileWriter(Path file) throws IOException {
writer = ParquetWriter.writeFile(CrawledDocumentParquetRecord.schema,
file.toFile(), CrawledDocumentParquetRecord.newDehydrator());
@ -42,12 +76,9 @@ public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable {
writer.write(domainData);
}
public void write(String domain, WarcRecord record) throws IOException {
if (!(record instanceof WarcResponse ref)) {
return;
}
public void write(String domain, WarcResponse response) throws IOException {
HttpFetchResult result = HttpFetchResult.importWarc(ref);
HttpFetchResult result = HttpFetchResult.importWarc(response);
if (!(result instanceof HttpFetchResult.ResultOk fetchOk)) {
return;
}
@ -59,7 +90,7 @@ public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable {
if (body instanceof DocumentBodyResult.Ok<byte[]> bodyOk) {
bodyBytes = bodyOk.body();
contentType = bodyOk.contentType();
contentType = bodyOk.contentType().toString();
}
else {
bodyBytes = new byte[0];
@ -68,7 +99,7 @@ public class CrawledDocumentParquetRecordFileWriter implements AutoCloseable {
write(new CrawledDocumentParquetRecord(
domain,
ref.target(),
response.target(),
fetchOk.ipAddress(),
false, // FIXME
fetchOk.statusCode(),

View File

@ -1,5 +1,9 @@
package nu.marginalia.crawling.parquet;
import nu.marginalia.crawling.io.format.ParquetSerializableCrawlDataStream;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.SerializableCrawlData;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@ -8,6 +12,7 @@ import org.netpreserve.jwarc.net.WarcRecorder;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import static org.junit.jupiter.api.Assertions.*;
@ -25,7 +30,7 @@ class CrawledDocumentParquetRecordFileWriterTest {
}
@Test
void write() throws IOException {
void testWriteRead() throws IOException {
var original = new CrawledDocumentParquetRecord("www.marginalia.nu",
"https://www.marginalia.nu/",
"127.0.0.1",
@ -38,10 +43,36 @@ class CrawledDocumentParquetRecordFileWriterTest {
writer.write(original);
}
try (var stream = CrawledDocumentParquetRecordFileReader.stream(tempFile)) {
var actual = stream.findFirst().orElseThrow();
assertEquals(original, actual);
var items = new ArrayList<SerializableCrawlData>();
try (var stream = new ParquetSerializableCrawlDataStream(tempFile)) {
while (stream.hasNext()) {
items.add(stream.next());
}
}
assertEquals(2, items.size());
var firstItem = items.get(0);
assertInstanceOf(CrawledDomain.class, firstItem);
var domain = (CrawledDomain) firstItem;
assertEquals("www.marginalia.nu", domain.domain);
assertNull(domain.redirectDomain);
assertEquals("OK", domain.crawlerStatus);
assertEquals("", domain.crawlerStatusDesc);
assertEquals(new ArrayList<>(), domain.doc);
assertEquals(new ArrayList<>(), domain.cookies);
var secondItem = items.get(1);
assertInstanceOf(CrawledDocument.class, secondItem);
var document = (CrawledDocument) secondItem;
assertEquals("https://www.marginalia.nu/", document.url);
assertEquals("text/html", document.contentType);
assertEquals("hello world", document.documentBody);
assertEquals(200, document.httpStatus);
assertEquals("https://www.marginalia.nu/", document.canonicalUrl);
}
}

View File

@ -18,6 +18,7 @@ import nu.marginalia.model.EdgeUrl;
import nu.marginalia.converting.processor.logic.links.TopKeywords;
import nu.marginalia.converting.processor.logic.LshDocumentDeduplicator;
import nu.marginalia.model.crawl.HtmlFeature;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -53,9 +54,15 @@ public class DomainProcessor {
}
@SneakyThrows
@Nullable
public ProcessedDomain process(SerializableCrawlDataStream dataStream) {
if (!dataStream.hasNext()) {
return null;
}
var ret = new ProcessedDomain();
List<ProcessedDocument> docs = new ArrayList<>();
Set<String> processedUrls = new HashSet<>();
boolean cookies = false;
String ip = "";
@ -90,7 +97,7 @@ public class DomainProcessor {
}
else if (data instanceof CrawledDocument doc) {
try {
if (doc.url == null)
if (doc.url == null || processedUrls.add(doc.url))
continue;
fixBadCanonicalTag(doc);

View File

@ -3,6 +3,7 @@ package nu.marginalia.converting.writer;
import lombok.SneakyThrows;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.worklog.BatchingWorkLog;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,7 +42,10 @@ public class ConverterWriter implements AutoCloseable {
}
@SneakyThrows
public void accept(ProcessedDomain domain) {
public void accept(@Nullable ProcessedDomain domain) {
if (null == domain)
return;
domainData.put(domain);
}

View File

@ -11,16 +11,16 @@ import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;
import nu.marginalia.crawling.io.format.WarcSerializableCrawlDataStream;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.crawling.model.SerializableCrawlData;
import nu.marginalia.model.crawlspec.CrawlSpecRecord;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.*;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
@ -30,6 +30,8 @@ public class CrawlingThenConvertingIntegrationTest {
private DomainProcessor domainProcessor;
private HttpFetcher httpFetcher;
private Path fileName;
@SneakyThrows
@BeforeAll
public static void setUpAll() {
@ -46,6 +48,12 @@ public class CrawlingThenConvertingIntegrationTest {
domainProcessor = injector.getInstance(DomainProcessor.class);
httpFetcher = new HttpFetcherImpl(WmsaHome.getUserAgent().uaString());
this.fileName = Files.createTempFile("crawling-then-converting", ".warc.gz");
}
@AfterEach
public void tearDown() throws IOException {
Files.deleteIfExists(fileName);
}
@Test
@ -78,10 +86,16 @@ public class CrawlingThenConvertingIntegrationTest {
private CrawledDomain crawl(CrawlSpecRecord specs) throws IOException {
List<SerializableCrawlData> data = new ArrayList<>();
try (var recorder = new WarcRecorder()) {
try (var recorder = new WarcRecorder(fileName)) {
new CrawlerRetreiver(httpFetcher, new DomainProber(d -> true), specs, recorder).fetch();
}
try (var reader = new WarcSerializableCrawlDataStream(fileName)) {
while (reader.hasNext()) {
data.add(reader.next());
}
}
CrawledDomain domain = data.stream().filter(CrawledDomain.class::isInstance).map(CrawledDomain.class::cast).findFirst().get();
data.stream().filter(CrawledDocument.class::isInstance).map(CrawledDocument.class::cast).forEach(domain.doc::add);
return domain;

View File

@ -244,12 +244,10 @@ public class CrawlerMain {
// (mostly a case when migrating from legacy->warc)
reference.delete();
Files.move(newWarcFile, finalWarcFile, StandardCopyOption.REPLACE_EXISTING);
CrawledDocumentParquetRecordFileWriter
.convertWarc(domain, finalWarcFile, parquetFile);
.convertWarc(domain, newWarcFile, parquetFile);
workLog.setJobToFinished(domain, finalWarcFile.toString(), size);
workLog.setJobToFinished(domain, parquetFile.toString(), size);
heartbeat.setProgress(tasksDone.incrementAndGet() / (double) totalTasks);
logger.info("Fetched {}", domain);

View File

@ -5,6 +5,7 @@ import com.google.common.hash.Hashing;
import crawlercommons.robots.SimpleRobotRules;
import lombok.SneakyThrows;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.contenttype.ContentType;
import nu.marginalia.crawl.retreival.fetcher.ContentTags;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcher;
import nu.marginalia.crawling.body.HttpFetchResult;
@ -247,11 +248,13 @@ public class CrawlerRetreiver implements AutoCloseable {
var contentTags = reference.getContentTags();
var fetchedDoc = tryDownload(top, timer, contentTags);
if (fetchedDoc instanceof HttpFetchResult.ResultSame) {
if (fetchedDoc instanceof HttpFetchResult.Result304Raw) {
var doc = reference.doc();
if (doc != null) {
warcRecorder.writeReferenceCopy(top, doc.contentType, doc.httpStatus, doc.documentBody);
fetchedDoc = new HttpFetchResult.ResultRetained(doc.url, doc.contentType, doc.documentBody);
fetchedDoc = new HttpFetchResult.Result304ReplacedWithReference(doc.url,
new ContentType(doc.contentType, "UTF-8"),
doc.documentBody);
}
}
@ -265,7 +268,7 @@ public class CrawlerRetreiver implements AutoCloseable {
crawlFrontier.addVisited(new EdgeUrl(ok.uri()));
}
}
else if (fetchedDoc instanceof HttpFetchResult.ResultRetained retained) {
else if (fetchedDoc instanceof HttpFetchResult.Result304ReplacedWithReference retained) {
var docOpt = retained.parseDocument();
if (docOpt.isPresent()) {
var doc = docOpt.get();

View File

@ -12,7 +12,6 @@ import nu.marginalia.crawl.retreival.fetcher.socket.FastTerminatingSocketFactory
import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor;
import nu.marginalia.crawl.retreival.fetcher.socket.NoSecuritySSL;
import nu.marginalia.crawling.body.DocumentBodyExtractor;
import nu.marginalia.crawling.body.DocumentBodyResult;
import nu.marginalia.crawling.body.HttpFetchResult;
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecorder;
import nu.marginalia.crawling.body.ContentTypeLogic;
@ -183,7 +182,7 @@ public class HttpFetcherImpl implements HttpFetcher {
throw new RateLimitException(retryAfter);
}
if (ok.statusCode() == 304) {
return new HttpFetchResult.ResultSame();
return new HttpFetchResult.Result304Raw();
}
if (ok.statusCode() == 200) {
return ok;
@ -268,7 +267,7 @@ public class HttpFetcherImpl implements HttpFetcher {
return DocumentBodyExtractor.asBytes(result).mapOpt((contentType, body) ->
robotsParser.parseContent(url.toString(),
body,
contentType,
contentType.toString(),
userAgent)
);

View File

@ -69,8 +69,12 @@ public class WarcRecorder implements AutoCloseable {
temporaryFile = true;
}
public HttpFetchResult fetch(OkHttpClient client, Request request) throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
URI uri = request.url().uri();
public HttpFetchResult fetch(OkHttpClient client, Request request) throws NoSuchAlgorithmException,
IOException,
URISyntaxException,
InterruptedException
{
URI requestUri = request.url().uri();
WarcDigestBuilder responseDigestBuilder = new WarcDigestBuilder();
WarcDigestBuilder payloadDigestBuilder = new WarcDigestBuilder();
@ -133,7 +137,11 @@ public class WarcRecorder implements AutoCloseable {
}
}
WarcResponse.Builder responseBuilder = new WarcResponse.Builder(uri)
// It looks like this might be the same as requestUri, but it's not;
// it's the URI after resolving redirects.
final URI responseUri = response.request().url().uri();
WarcResponse.Builder responseBuilder = new WarcResponse.Builder(responseUri)
.blockDigest(responseDigestBuilder.build())
.date(date)
.body(MediaType.HTTP_RESPONSE, responseDataBuffer.copyBytes());
@ -155,11 +163,11 @@ public class WarcRecorder implements AutoCloseable {
WarcDigestBuilder requestDigestBuilder = new WarcDigestBuilder();
String httpRequestString = WarcProtocolReconstructor.getHttpRequestString(response.request(), uri);
String httpRequestString = WarcProtocolReconstructor.getHttpRequestString(response.request(), requestUri);
requestDigestBuilder.update(httpRequestString);
WarcRequest warcRequest = new WarcRequest.Builder(uri)
WarcRequest warcRequest = new WarcRequest.Builder(requestUri)
.blockDigest(requestDigestBuilder.build())
.date(date)
.body(MediaType.HTTP_REQUEST, httpRequestString.getBytes())
@ -168,7 +176,7 @@ public class WarcRecorder implements AutoCloseable {
warcRequest.http(); // force HTTP header to be parsed before body is consumed so that caller can use it
writer.write(warcRequest);
return new HttpFetchResult.ResultOk(uri,
return new HttpFetchResult.ResultOk(responseUri,
response.code(),
response.headers(),
ip,
@ -177,7 +185,7 @@ public class WarcRecorder implements AutoCloseable {
responseDataBuffer.length() - dataStart);
}
catch (Exception ex) {
logger.warn("Failed to fetch URL {}", uri, ex);
logger.warn("Failed to fetch URL {}", requestUri, ex);
return new HttpFetchResult.ResultException(ex);
}
}

View File

@ -23,9 +23,9 @@ public record DocumentWithReference(
* or if the result was retained via HTTP 304.
*/
public boolean isSame(HttpFetchResult result) {
if (result instanceof HttpFetchResult.ResultSame)
if (result instanceof HttpFetchResult.Result304Raw)
return true;
if (result instanceof HttpFetchResult.ResultRetained)
if (result instanceof HttpFetchResult.Result304ReplacedWithReference)
return true;
if (!(result instanceof HttpFetchResult.ResultOk resultOk))

View File

@ -53,8 +53,6 @@ class WarcRecorderTest {
.addHeader("Accept-Encoding", "gzip")
.get().build());
new GZIPInputStream(Files.newInputStream(fileNameWarc)).transferTo(System.out);
Map<String, String> sampleData = new HashMap<>();
try (var warcReader = new WarcReader(fileNameWarc)) {
warcReader.forEach(record -> {
@ -91,8 +89,6 @@ class WarcRecorderTest {
}
}
}
new GZIPInputStream(Files.newInputStream(fileNameWarc)).transferTo(System.out);
}
@Test