Implement Warc-recording wrapper for OkHttp3 client
This is a first step of using WARC as an intermediate flight recorder style step in the crawler, ultimately aimed at being able to resume crawls if the crawler is restarted. This component is currently not hooked into anything. The OkHttp3 client wrapper class 'WarcRecordingFetcherClient' was implemented for web archiving. This allows for the recording of HTTP requests and responses. New classes were introduced, 'WarcDigestBuilder', 'IpInterceptingNetworkInterceptor', and 'WarcProtocolReconstructor'. The JWarc dependency was added to the build.gradle file, and relevant unit tests were also introduced. Some HttpFetcher-adjacent structural changes were also done for better organization.
This commit is contained in:
parent
fabffa80f0
commit
072b5fcd12
@ -232,4 +232,11 @@ public class EdgeUrl implements Serializable {
|
||||
|
||||
return new URL(this.proto, this.domain.toString(), port, this.path);
|
||||
}
|
||||
|
||||
public URI asURI() throws URISyntaxException {
|
||||
if (port == null)
|
||||
return new URI(this.proto, null, this.domain.toString(), this.path, this.param);
|
||||
else
|
||||
return new URI(this.proto, null, this.domain.toString(), this.port, this.path, this.param, null);
|
||||
}
|
||||
}
|
||||
|
@ -49,6 +49,7 @@ dependencies {
|
||||
implementation libs.guice
|
||||
implementation libs.gson
|
||||
implementation libs.zstd
|
||||
implementation libs.jwarc
|
||||
implementation libs.crawlercommons
|
||||
implementation libs.okhttp3
|
||||
implementation libs.jsoup
|
||||
|
@ -8,9 +8,9 @@ import lombok.SneakyThrows;
|
||||
import nu.marginalia.contenttype.DocumentBodyToString;
|
||||
import nu.marginalia.crawl.retreival.Cookies;
|
||||
import nu.marginalia.crawl.retreival.RateLimitException;
|
||||
import nu.marginalia.crawl.retreival.fetcher.socket.*;
|
||||
import nu.marginalia.crawling.model.CrawledDocument;
|
||||
import nu.marginalia.crawling.model.CrawlerDocumentStatus;
|
||||
import nu.marginalia.contenttype.ContentType;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import nu.marginalia.model.EdgeUrl;
|
||||
import nu.marginalia.crawl.retreival.logic.ContentTypeLogic;
|
||||
@ -26,10 +26,7 @@ import javax.net.ssl.X509TrustManager;
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
import java.net.*;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.charset.IllegalCharsetNameException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.nio.charset.UnsupportedCharsetException;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
@ -65,6 +62,7 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
return builder.sslSocketFactory(NoSecuritySSL.buildSocketFactory(), (X509TrustManager) NoSecuritySSL.trustAllCerts[0])
|
||||
.socketFactory(ftSocketFactory)
|
||||
.hostnameVerifier(NoSecuritySSL.buildHostnameVerifyer())
|
||||
.addNetworkInterceptor(new IpInterceptingNetworkInterceptor())
|
||||
.connectionPool(pool)
|
||||
.cookieJar(cookies.getJar())
|
||||
.followRedirects(true)
|
||||
@ -141,8 +139,8 @@ public class HttpFetcherImpl implements HttpFetcher {
|
||||
|
||||
var headBuilder = new Request.Builder().head()
|
||||
.addHeader("User-agent", userAgent)
|
||||
.url(url.toString())
|
||||
.addHeader("Accept-Encoding", "gzip");
|
||||
.addHeader("Accept-Encoding", "gzip")
|
||||
.url(url.toString());
|
||||
|
||||
var head = headBuilder.build();
|
||||
var call = client.newCall(head);
|
||||
|
@ -1,4 +1,4 @@
|
||||
package nu.marginalia.crawl.retreival.fetcher;
|
||||
package nu.marginalia.crawl.retreival.fetcher.socket;
|
||||
|
||||
import javax.net.SocketFactory;
|
||||
import java.io.IOException;
|
@ -0,0 +1,24 @@
|
||||
package nu.marginalia.crawl.retreival.fetcher.socket;
|
||||
|
||||
import okhttp3.Interceptor;
|
||||
import okhttp3.Response;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
public class IpInterceptingNetworkInterceptor implements Interceptor {
|
||||
@NotNull
|
||||
@Override
|
||||
public Response intercept(@NotNull Interceptor.Chain chain) throws IOException {
|
||||
String IP = chain.connection().socket().getInetAddress().getHostAddress();
|
||||
|
||||
return chain.proceed(chain.request())
|
||||
.newBuilder()
|
||||
.addHeader("X-Remote-IP", IP)
|
||||
.build();
|
||||
}
|
||||
|
||||
public static String getIpFromResponse(Response response) {
|
||||
return response.header("X-Remote-IP");
|
||||
}
|
||||
}
|
@ -1,4 +1,4 @@
|
||||
package nu.marginalia.crawl.retreival.fetcher;
|
||||
package nu.marginalia.crawl.retreival.fetcher.socket;
|
||||
|
||||
import lombok.SneakyThrows;
|
||||
|
||||
@ -8,6 +8,8 @@ import java.security.cert.X509Certificate;
|
||||
public class NoSecuritySSL {
|
||||
|
||||
// Create a trust manager that does not validate certificate chains
|
||||
// We want to accept e.g. self-signed certificates and certificates
|
||||
// that are not signed by a CA is generally trusted by the system.
|
||||
public static final TrustManager[] trustAllCerts = new TrustManager[]{
|
||||
new X509TrustManager() {
|
||||
@Override
|
@ -0,0 +1,29 @@
|
||||
package nu.marginalia.crawl.retreival.fetcher.warc;
|
||||
|
||||
import org.netpreserve.jwarc.WarcDigest;
|
||||
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
|
||||
class WarcDigestBuilder {
|
||||
private final MessageDigest digest;
|
||||
|
||||
private static final String digestAlgorithm = "SHA-1";
|
||||
|
||||
public WarcDigestBuilder() throws NoSuchAlgorithmException {
|
||||
this.digest = MessageDigest.getInstance(digestAlgorithm);
|
||||
}
|
||||
|
||||
public void update(String s) {
|
||||
byte[] bytes = s.getBytes();
|
||||
update(bytes, bytes.length);
|
||||
}
|
||||
|
||||
public void update(byte[] buffer, int n) {
|
||||
digest.update(buffer, 0, n);
|
||||
}
|
||||
|
||||
public WarcDigest build() {
|
||||
return new WarcDigest(digest);
|
||||
}
|
||||
}
|
@ -0,0 +1,127 @@
|
||||
package nu.marginalia.crawl.retreival.fetcher.warc;
|
||||
|
||||
import okhttp3.Protocol;
|
||||
import okhttp3.Request;
|
||||
import okhttp3.Response;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.Map;
|
||||
import java.util.StringJoiner;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/** We don't have access to the raw HTTP request and response, so we need to reconstruct them
|
||||
* as best is possible from the data we have available.
|
||||
*/
|
||||
public class WarcProtocolReconstructor {
|
||||
|
||||
static String getHttpRequestString(Request request, URI uri) {
|
||||
StringBuilder requestStringBuilder = new StringBuilder();
|
||||
requestStringBuilder.append(request.method()).append(" ").append(uri.getPath());
|
||||
if (uri.getQuery() != null) {
|
||||
requestStringBuilder.append("?").append(uri.getQuery());
|
||||
}
|
||||
requestStringBuilder.append(" HTTP/1.1\r\n");
|
||||
requestStringBuilder.append("Host: ").append(uri.getHost()).append("\r\n");
|
||||
|
||||
request.headers().toMultimap().forEach((k, values) -> {
|
||||
for (var value : values) {
|
||||
requestStringBuilder.append(capitalizeHeader(k)).append(": ").append(value).append("\r\n");
|
||||
}
|
||||
});
|
||||
|
||||
return requestStringBuilder.toString();
|
||||
}
|
||||
|
||||
static String getResponseHeader(Response response) {
|
||||
String version = response.protocol() == Protocol.HTTP_1_1 ? "1.1" : "2.0";
|
||||
|
||||
String statusCode = String.valueOf(response.code());
|
||||
String statusMessage = STATUS_CODE_MAP.getOrDefault(response.code(), "Unknown");
|
||||
|
||||
String headerString = getHeadersAsString(response);
|
||||
|
||||
return STR."HTTP/\{version} \{statusCode} \{statusMessage}\r\n\{headerString}\r\n\r\n";
|
||||
}
|
||||
|
||||
private static final Map<Integer, String> STATUS_CODE_MAP = Map.ofEntries(
|
||||
Map.entry(200, "OK"),
|
||||
Map.entry(201, "Created"),
|
||||
Map.entry(202, "Accepted"),
|
||||
Map.entry(203, "Non-Authoritative Information"),
|
||||
Map.entry(204, "No Content"),
|
||||
Map.entry(205, "Reset Content"),
|
||||
Map.entry(206, "Partial Content"),
|
||||
Map.entry(207, "Multi-Status"),
|
||||
Map.entry(208, "Already Reported"),
|
||||
Map.entry(226, "IM Used"),
|
||||
Map.entry(300, "Multiple Choices"),
|
||||
Map.entry(301, "Moved Permanently"),
|
||||
Map.entry(302, "Found"),
|
||||
Map.entry(303, "See Other"),
|
||||
Map.entry(304, "Not Modified"),
|
||||
Map.entry(307, "Temporary Redirect"),
|
||||
Map.entry(308, "Permanent Redirect"),
|
||||
Map.entry(400, "Bad Request"),
|
||||
Map.entry(401, "Unauthorized"),
|
||||
Map.entry(403, "Forbidden"),
|
||||
Map.entry(404, "Not Found"),
|
||||
Map.entry(405, "Method Not Allowed"),
|
||||
Map.entry(406, "Not Acceptable"),
|
||||
Map.entry(408, "Request Timeout"),
|
||||
Map.entry(409, "Conflict"),
|
||||
Map.entry(410, "Gone"),
|
||||
Map.entry(411, "Length Required"),
|
||||
Map.entry(412, "Precondition Failed"),
|
||||
Map.entry(413, "Payload Too Large"),
|
||||
Map.entry(414, "URI Too Long"),
|
||||
Map.entry(415, "Unsupported Media Type"),
|
||||
Map.entry(416, "Range Not Satisfiable"),
|
||||
Map.entry(417, "Expectation Failed"),
|
||||
Map.entry(418, "I'm a teapot"),
|
||||
Map.entry(421, "Misdirected Request"),
|
||||
Map.entry(426, "Upgrade Required"),
|
||||
Map.entry(428, "Precondition Required"),
|
||||
Map.entry(429, "Too Many Requests"),
|
||||
Map.entry(431, "Request Header Fields Too Large"),
|
||||
Map.entry(451, "Unavailable For Legal Reasons"),
|
||||
Map.entry(500, "Internal Server Error"),
|
||||
Map.entry(501, "Not Implemented"),
|
||||
Map.entry(502, "Bad Gateway"),
|
||||
Map.entry(503, "Service Unavailable"),
|
||||
Map.entry(504, "Gateway Timeout"),
|
||||
Map.entry(505, "HTTP Version Not Supported"),
|
||||
Map.entry(506, "Variant Also Negotiates"),
|
||||
Map.entry(507, "Insufficient Storage"),
|
||||
Map.entry(508, "Loop Detected"),
|
||||
Map.entry(510, "Not Extended"),
|
||||
Map.entry(511, "Network Authentication Required")
|
||||
);
|
||||
|
||||
|
||||
static private String getHeadersAsString(Response response) {
|
||||
StringJoiner joiner = new StringJoiner("\r\n");
|
||||
|
||||
response.headers().toMultimap().forEach((k, values) -> {
|
||||
String headerCapitalized = capitalizeHeader(k);
|
||||
|
||||
if (headerCapitalized.startsWith("X"))
|
||||
return;
|
||||
|
||||
for (var value : values) {
|
||||
joiner.add(headerCapitalized + ": " + value);
|
||||
}
|
||||
});
|
||||
return joiner.toString();
|
||||
}
|
||||
|
||||
// okhttp gives us flattened headers, so we need to reconstruct Camel-Kebab-Case style
|
||||
// for the WARC parser's sake...
|
||||
static private String capitalizeHeader(String k) {
|
||||
return Arrays.stream(StringUtils.split(k, '-'))
|
||||
.map(StringUtils::capitalize)
|
||||
.collect(Collectors.joining("-"));
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,175 @@
|
||||
package nu.marginalia.crawl.retreival.fetcher.warc;
|
||||
|
||||
import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Request;
|
||||
import org.netpreserve.jwarc.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.net.InetAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.time.Instant;
|
||||
import java.util.Optional;
|
||||
|
||||
/** Based on JWarc's fetch method, APL 2.0 license
|
||||
* <p></p>
|
||||
* This class wraps OkHttp's OkHttpClient and records the HTTP request and response in a WARC file,
|
||||
* as best is possible given not all the data is available at the same time and needs to
|
||||
* be reconstructed.
|
||||
*/
|
||||
public class WarcRecordingFetcherClient implements AutoCloseable {
|
||||
private static final int MAX_TIME = 30_000;
|
||||
private static final int MAX_SIZE = 1024 * 1024 * 10;
|
||||
private final WarcWriter writer;
|
||||
|
||||
private final EdgeDomain domain;
|
||||
private static final Logger logger = LoggerFactory.getLogger(WarcRecordingFetcherClient.class);
|
||||
|
||||
|
||||
public WarcRecordingFetcherClient(Path warcFile, EdgeDomain domain) throws IOException {
|
||||
this.writer = new WarcWriter(warcFile);
|
||||
this.domain = domain;
|
||||
}
|
||||
|
||||
public Optional<WarcResponse> fetch(OkHttpClient client, Request request) throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
|
||||
URI uri = request.url().uri();
|
||||
|
||||
WarcDigestBuilder responseDigestBuilder = new WarcDigestBuilder();
|
||||
WarcDigestBuilder payloadDigestBuilder = new WarcDigestBuilder();
|
||||
|
||||
String ip;
|
||||
Instant date = Instant.now();
|
||||
long startMillis = date.toEpochMilli();
|
||||
|
||||
Path tempFileName = Files.createTempFile(domain.toString(), ".data");
|
||||
|
||||
var call = client.newCall(request);
|
||||
|
||||
int totalLength = 0;
|
||||
|
||||
WarcTruncationReason truncationReason = null;
|
||||
|
||||
|
||||
|
||||
try (FileChannel tempFile =
|
||||
(FileChannel) Files.newByteChannel(tempFileName, StandardOpenOption.READ, StandardOpenOption.WRITE);
|
||||
var response = call.execute()
|
||||
) {
|
||||
var body = response.body();
|
||||
InputStream inputStream;
|
||||
|
||||
if (body == null) {
|
||||
inputStream = null;
|
||||
truncationReason = WarcTruncationReason.DISCONNECT;
|
||||
}
|
||||
else {
|
||||
inputStream = body.byteStream();
|
||||
}
|
||||
|
||||
byte[] buf = new byte[8192];
|
||||
|
||||
ip = IpInterceptingNetworkInterceptor.getIpFromResponse(response);
|
||||
|
||||
String responseHeaders = WarcProtocolReconstructor.getResponseHeader(response);
|
||||
tempFile.write(ByteBuffer.wrap(responseHeaders.getBytes()));
|
||||
responseDigestBuilder.update(responseHeaders);
|
||||
|
||||
while (inputStream != null) {
|
||||
int remainingLength;
|
||||
|
||||
if (MAX_SIZE > 0 && MAX_SIZE - totalLength < buf.length) {
|
||||
remainingLength = (MAX_SIZE - totalLength);
|
||||
} else {
|
||||
remainingLength = buf.length;
|
||||
}
|
||||
|
||||
int n = inputStream.read(buf, 0, remainingLength);
|
||||
if (n < 0)
|
||||
break;
|
||||
|
||||
totalLength += n;
|
||||
|
||||
for (int i = 0; i < n; ) {
|
||||
int written = tempFile.write(ByteBuffer.wrap(buf, i, n - i));
|
||||
i += written;
|
||||
}
|
||||
|
||||
responseDigestBuilder.update(buf, n);
|
||||
payloadDigestBuilder.update(buf, n);
|
||||
|
||||
if (MAX_TIME > 0 && System.currentTimeMillis() - startMillis > MAX_TIME) {
|
||||
truncationReason = WarcTruncationReason.TIME;
|
||||
break;
|
||||
}
|
||||
if (MAX_SIZE > 0 && totalLength >= MAX_SIZE) {
|
||||
truncationReason = WarcTruncationReason.LENGTH;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
tempFile.position(0);
|
||||
WarcResponse.Builder responseBuilder = new WarcResponse.Builder(uri)
|
||||
.blockDigest(responseDigestBuilder.build())
|
||||
.date(date)
|
||||
.body(MediaType.HTTP_RESPONSE, tempFile, tempFile.size());
|
||||
|
||||
if (ip != null) responseBuilder.ipAddress(InetAddress.getByName(ip));
|
||||
|
||||
responseBuilder.payloadDigest(payloadDigestBuilder.build());
|
||||
|
||||
if (truncationReason != null)
|
||||
responseBuilder.truncated(truncationReason);
|
||||
|
||||
// Build and write the response
|
||||
|
||||
var warcResponse = responseBuilder.build();
|
||||
warcResponse.http(); // force HTTP header to be parsed before body is consumed so that caller can use it
|
||||
writer.write(warcResponse);
|
||||
|
||||
// Build and write the request
|
||||
|
||||
WarcDigestBuilder requestDigestBuilder = new WarcDigestBuilder();
|
||||
|
||||
String httpRequestString = WarcProtocolReconstructor.getHttpRequestString(response.request(), uri);
|
||||
|
||||
requestDigestBuilder.update(httpRequestString);
|
||||
|
||||
WarcRequest warcRequest = new WarcRequest.Builder(uri)
|
||||
.blockDigest(requestDigestBuilder.build())
|
||||
.date(date)
|
||||
.body(MediaType.HTTP_REQUEST, httpRequestString.getBytes())
|
||||
.concurrentTo(warcResponse.id())
|
||||
.build();
|
||||
warcRequest.http(); // force HTTP header to be parsed before body is consumed so that caller can use it
|
||||
writer.write(warcRequest);
|
||||
|
||||
return Optional.of(warcResponse);
|
||||
}
|
||||
catch (Exception ex) {
|
||||
logger.warn("Failed to fetch URL {}", uri, ex);
|
||||
return Optional.empty();
|
||||
}
|
||||
finally {
|
||||
Files.deleteIfExists(tempFileName);
|
||||
}
|
||||
}
|
||||
|
||||
public void close() {
|
||||
try {
|
||||
writer.close();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,70 @@
|
||||
package nu.marginalia.crawl.retreival.fetcher;
|
||||
|
||||
import nu.marginalia.crawl.retreival.fetcher.socket.IpInterceptingNetworkInterceptor;
|
||||
import nu.marginalia.crawl.retreival.fetcher.warc.WarcRecordingFetcherClient;
|
||||
import nu.marginalia.model.EdgeDomain;
|
||||
import okhttp3.OkHttpClient;
|
||||
import okhttp3.Request;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.netpreserve.jwarc.WarcReader;
|
||||
import org.netpreserve.jwarc.WarcRequest;
|
||||
import org.netpreserve.jwarc.WarcResponse;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.zip.GZIPInputStream;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
|
||||
class WarcRecordingFetcherClientTest {
|
||||
Path fileName;
|
||||
WarcRecordingFetcherClient client;
|
||||
OkHttpClient httpClient;
|
||||
@BeforeEach
|
||||
public void setUp() throws Exception {
|
||||
httpClient = new OkHttpClient.Builder()
|
||||
.addNetworkInterceptor(new IpInterceptingNetworkInterceptor())
|
||||
.build();
|
||||
|
||||
fileName = Files.createTempFile("test", ".warc.gz");
|
||||
client = new WarcRecordingFetcherClient(fileName, new EdgeDomain("www.marginalia.nu"));
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void tearDown() throws Exception {
|
||||
client.close();
|
||||
Files.delete(fileName);
|
||||
}
|
||||
|
||||
@Test
|
||||
void fetch() throws NoSuchAlgorithmException, IOException, URISyntaxException, InterruptedException {
|
||||
client.fetch(httpClient, new Request.Builder().url("https://www.marginalia.nu/")
|
||||
.addHeader("User-agent", "test.marginalia.nu")
|
||||
.addHeader("Accept-Encoding", "gzip")
|
||||
.get().build());
|
||||
|
||||
new GZIPInputStream(Files.newInputStream(fileName)).transferTo(System.out);
|
||||
|
||||
Map<String, String> sampleData = new HashMap<>();
|
||||
try (var warcReader = new WarcReader(fileName)) {
|
||||
warcReader.forEach(record -> {
|
||||
if (record instanceof WarcRequest req) {
|
||||
sampleData.put(record.type(), req.target());
|
||||
}
|
||||
if (record instanceof WarcResponse rsp) {
|
||||
sampleData.put(record.type(), rsp.target());
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
assertEquals("https://www.marginalia.nu/", sampleData.get("request"));
|
||||
assertEquals("https://www.marginalia.nu/", sampleData.get("response"));
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user