(warc) Improve WARC standard adherence

The WARC specification says the records should transparently remove compression.  This was not done, leading to the WARC typically being a bit of a gzip-Matryoshka.
This commit is contained in:
Viktor Lofgren 2024-02-09 17:29:21 +01:00
parent 1188fe3bf0
commit 8340aa2b6c
4 changed files with 263 additions and 61 deletions

View file

@ -4,12 +4,9 @@ import nu.marginalia.contenttype.ContentType;
import nu.marginalia.contenttype.ContentTypeParser; import nu.marginalia.contenttype.ContentTypeParser;
import nu.marginalia.contenttype.DocumentBodyToString; import nu.marginalia.contenttype.DocumentBodyToString;
import nu.marginalia.crawling.model.CrawlerDocumentStatus; import nu.marginalia.crawling.model.CrawlerDocumentStatus;
import org.apache.commons.io.input.BOMInputStream;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.zip.GZIPInputStream;
public class DocumentBodyExtractor { public class DocumentBodyExtractor {
private static final ContentTypeLogic contentTypeLogic = new ContentTypeLogic(); private static final ContentTypeLogic contentTypeLogic = new ContentTypeLogic();
@ -55,12 +52,6 @@ public class DocumentBodyExtractor {
public static DocumentBodyResult<byte[]> asBytes(HttpFetchResult.ResultOk rsp) { public static DocumentBodyResult<byte[]> asBytes(HttpFetchResult.ResultOk rsp) {
try { try {
var byteStream = rsp.getInputStream(); var byteStream = rsp.getInputStream();
if ("gzip".equals(rsp.header("Content-Encoding"))) {
byteStream = new GZIPInputStream(byteStream);
}
byteStream = new BOMInputStream(byteStream);
var contentTypeHeader = rsp.header("Content-Type"); var contentTypeHeader = rsp.header("Content-Type");
byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder

View file

@ -0,0 +1,224 @@
package nu.marginalia.crawl.retreival.fetcher.warc;
import okhttp3.Headers;
import okhttp3.Response;
import org.apache.commons.io.input.BOMInputStream;
import org.netpreserve.jwarc.WarcTruncationReason;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Objects;
import java.util.zip.GZIPInputStream;
/** Input buffer for temporary storage of a HTTP response
* This may be in-memory or on-disk, at the discretion of
* the implementation.
* */
public abstract class WarcInputBuffer implements AutoCloseable {
protected WarcTruncationReason truncationReason = WarcTruncationReason.NOT_TRUNCATED;
protected Headers headers;
WarcInputBuffer(Headers headers) {
this.headers = headers;
}
/** If necessary, the stream is closed when the buffer is closed */
public abstract InputStream read() throws IOException;
/** The size of the response */
public abstract int size();
public final WarcTruncationReason truncationReason() { return truncationReason; }
public final Headers headers() { return headers; }
/** Create a buffer for a response.
* If the response is small and not compressed, it will be stored in memory.
* Otherwise, it will be stored in a temporary file, with compression transparently handled
* and suppressed from the headers.
* If an error occurs, a buffer will be created with no content and an error status.
*/
static WarcInputBuffer forResponse(Response rsp) {
if (rsp == null)
return new ErrorBuffer();
try {
String contentLengthHeader = Objects.requireNonNullElse(rsp.header("Content-Length"), "-1");
int contentLength = Integer.parseInt(contentLengthHeader);
String contentEncoding = rsp.header("Content-Encoding");
if (contentEncoding == null && contentLength > 0 && contentLength < 8192) {
// If the content is small and not compressed, we can just read it into memory
return new MemoryBuffer(rsp, contentLength);
}
else {
// Otherwise, we unpack it into a file and read it from there
return new FileBuffer(rsp);
}
}
catch (Exception ex) {
return new ErrorBuffer(rsp);
}
}
/** Copy an input stream to an output stream, with a maximum size and time limit */
protected void copy(InputStream is, OutputStream os) {
long startTime = System.currentTimeMillis();
long size = 0;
byte[] buffer = new byte[8192];
// Gobble up the BOM if it's there
is = new BOMInputStream(is);
while (true) {
try {
int n = is.read(buffer);
if (n < 0) break;
size += n;
os.write(buffer, 0, n);
if (size > WarcRecorder.MAX_SIZE) {
truncationReason = WarcTruncationReason.LENGTH;
break;
}
if (System.currentTimeMillis() - startTime > WarcRecorder.MAX_TIME) {
truncationReason = WarcTruncationReason.TIME;
break;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
/** Pseudo-buffer for when we have an error */
class ErrorBuffer extends WarcInputBuffer {
public ErrorBuffer() {
super(Headers.of());
truncationReason = WarcTruncationReason.UNSPECIFIED;
}
public ErrorBuffer(Response rsp) {
super(rsp.headers());
truncationReason = WarcTruncationReason.UNSPECIFIED;
}
@Override
public InputStream read() throws IOException {
return ByteArrayInputStream.nullInputStream();
}
@Override
public int size() {
return 0;
}
@Override
public void close() throws Exception {}
}
/** Buffer for when we have the response in memory */
class MemoryBuffer extends WarcInputBuffer {
byte[] data;
public MemoryBuffer(Response response, int size) {
super(response.headers());
var outputStream = new ByteArrayOutputStream(size);
copy(response.body().byteStream(), outputStream);
data = outputStream.toByteArray();
}
@Override
public InputStream read() throws IOException {
return new ByteArrayInputStream(data);
}
@Override
public int size() {
return data.length;
}
@Override
public void close() throws Exception {
}
}
/** Buffer for when we have the response in a file */
class FileBuffer extends WarcInputBuffer {
private final Path tempFile;
public FileBuffer(Response response) throws IOException {
super(suppressContentEncoding(response.headers()));
this.tempFile = Files.createTempFile("rsp", ".html");
if (response.body() == null) {
truncationReason = WarcTruncationReason.DISCONNECT;
return;
}
if ("gzip".equals(response.header("Content-Encoding"))) {
try (var out = Files.newOutputStream(tempFile)) {
copy(new GZIPInputStream(response.body().byteStream()), out);
}
catch (Exception ex) {
truncationReason = WarcTruncationReason.UNSPECIFIED;
}
}
else {
try (var out = Files.newOutputStream(tempFile)) {
copy(response.body().byteStream(), out);
}
catch (Exception ex) {
truncationReason = WarcTruncationReason.UNSPECIFIED;
}
}
}
private static Headers suppressContentEncoding(Headers headers) {
var builder = new Headers.Builder();
headers.toMultimap().forEach((k, values) -> {
if ("Content-Encoding".equalsIgnoreCase(k)) {
return;
}
if ("Transfer-Encoding".equalsIgnoreCase(k)) {
return;
}
for (var value : values) {
builder.add(k, value);
}
});
return builder.build();
}
public InputStream read() throws IOException {
return Files.newInputStream(tempFile);
}
public int size() {
try {
long fileSize = Files.size(tempFile);
if (fileSize > Integer.MAX_VALUE) {
throw new IllegalStateException("File too large");
}
return (int) fileSize;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws Exception {
Files.deleteIfExists(tempFile);
}
}

View file

@ -64,13 +64,13 @@ public class WarcProtocolReconstructor {
return STR."HTTP/\{version} \{statusCode} \{statusMessage}\r\n\{headerString}\r\n\r\n"; return STR."HTTP/\{version} \{statusCode} \{statusMessage}\r\n\{headerString}\r\n\r\n";
} }
static String getResponseHeader(Response response) { static String getResponseHeader(Response response, long size) {
String version = response.protocol() == Protocol.HTTP_1_1 ? "1.1" : "2.0"; String version = response.protocol() == Protocol.HTTP_1_1 ? "1.1" : "2.0";
String statusCode = String.valueOf(response.code()); String statusCode = String.valueOf(response.code());
String statusMessage = STATUS_CODE_MAP.getOrDefault(response.code(), "Unknown"); String statusMessage = STATUS_CODE_MAP.getOrDefault(response.code(), "Unknown");
String headerString = getHeadersAsString(response); String headerString = getHeadersAsString(response, size);
return STR."HTTP/\{version} \{statusCode} \{statusMessage}\r\n\{headerString}\r\n\r\n"; return STR."HTTP/\{version} \{statusCode} \{statusMessage}\r\n\{headerString}\r\n\r\n";
} }
@ -137,7 +137,7 @@ public class WarcProtocolReconstructor {
return joiner.toString(); return joiner.toString();
} }
static private String getHeadersAsString(Response response) { static private String getHeadersAsString(Response response, long responseSize) {
StringJoiner joiner = new StringJoiner("\r\n"); StringJoiner joiner = new StringJoiner("\r\n");
response.headers().toMultimap().forEach((k, values) -> { response.headers().toMultimap().forEach((k, values) -> {
@ -147,15 +147,24 @@ public class WarcProtocolReconstructor {
if (headerCapitalized.startsWith("X-Marginalia")) if (headerCapitalized.startsWith("X-Marginalia"))
return; return;
// Omit Transfer-Encoding header, as we'll be using Content-Length // Omit Transfer-Encoding and Content-Encoding headers
// instead in the warc file, despite what the server says if (headerCapitalized.equals("Transfer-Encoding"))
if (headerCapitalized.startsWith("Transfer-Encoding")) return;
if (headerCapitalized.equals("Content-Encoding"))
return;
// Since we're transparently decoding gzip, we need to update the Content-Length header
// to reflect the actual size of the response body. We'll do this at the end.
if (headerCapitalized.equals("Content-Length"))
return; return;
for (var value : values) { for (var value : values) {
joiner.add(headerCapitalized + ": " + value); joiner.add(headerCapitalized + ": " + value);
} }
}); });
joiner.add("Content-Length: " + responseSize);
return joiner.toString(); return joiner.toString();
} }

View file

@ -30,14 +30,15 @@ import java.util.*;
* be reconstructed. * be reconstructed.
*/ */
public class WarcRecorder implements AutoCloseable { public class WarcRecorder implements AutoCloseable {
private static final int MAX_TIME = 30_000; /** Maximum time we'll wait on a single request */
private static final int MAX_SIZE = 1024 * 1024 * 10; static final int MAX_TIME = 30_000;
/** Maximum (decompressed) size we'll fetch */
static final int MAX_SIZE = 1024 * 1024 * 10;
private final WarcWriter writer; private final WarcWriter writer;
private final Path warcFile; private final Path warcFile;
private static final Logger logger = LoggerFactory.getLogger(WarcRecorder.class); private static final Logger logger = LoggerFactory.getLogger(WarcRecorder.class);
private final static ThreadLocal<byte[]> bufferThreadLocal = ThreadLocal.withInitial(() -> new byte[MAX_SIZE]);
private boolean temporaryFile = false; private boolean temporaryFile = false;
// Affix a version string in case we need to change the format in the future // Affix a version string in case we need to change the format in the future
@ -82,40 +83,27 @@ public class WarcRecorder implements AutoCloseable {
String ip; String ip;
Instant date = Instant.now(); Instant date = Instant.now();
long startMillis = date.toEpochMilli();
var call = client.newCall(request); var call = client.newCall(request);
int totalLength = 0;
WarcTruncationReason truncationReason = null;
ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer();
cookieInformation.update(client, request.url()); cookieInformation.update(client, request.url());
try (var response = call.execute()) { try (var response = call.execute();
var body = response.body(); WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response))
InputStream inputStream; {
String responseHeaders = WarcProtocolReconstructor.getResponseHeader(response, inputBuffer.size());
if (body == null) { ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer(inputBuffer.size() + responseHeaders.length());
inputStream = null; InputStream inputStream = inputBuffer.read();
truncationReason = WarcTruncationReason.DISCONNECT;
}
else {
inputStream = body.byteStream();
}
ip = IpInterceptingNetworkInterceptor.getIpFromResponse(response); ip = IpInterceptingNetworkInterceptor.getIpFromResponse(response);
String responseHeaders = WarcProtocolReconstructor.getResponseHeader(response);
responseDataBuffer.put(responseHeaders); responseDataBuffer.put(responseHeaders);
responseDataBuffer.updateDigest(responseDigestBuilder, 0, responseDataBuffer.length()); responseDataBuffer.updateDigest(responseDigestBuilder, 0, responseDataBuffer.length());
int dataStart = responseDataBuffer.pos(); int dataStart = responseDataBuffer.pos();
while (inputStream != null) { for (;;) {
int remainingLength = responseDataBuffer.remaining(); int remainingLength = responseDataBuffer.remaining();
if (remainingLength == 0) if (remainingLength == 0)
break; break;
@ -128,16 +116,6 @@ public class WarcRecorder implements AutoCloseable {
responseDataBuffer.updateDigest(responseDigestBuilder, startPos, n); responseDataBuffer.updateDigest(responseDigestBuilder, startPos, n);
responseDataBuffer.updateDigest(payloadDigestBuilder, startPos, n); responseDataBuffer.updateDigest(payloadDigestBuilder, startPos, n);
totalLength += n;
if (MAX_TIME > 0 && System.currentTimeMillis() - startMillis > MAX_TIME) {
truncationReason = WarcTruncationReason.TIME;
break;
}
if (MAX_SIZE > 0 && totalLength >= MAX_SIZE) {
truncationReason = WarcTruncationReason.LENGTH;
break;
}
} }
// It looks like this might be the same as requestUri, but it's not; // It looks like this might be the same as requestUri, but it's not;
@ -154,9 +132,7 @@ public class WarcRecorder implements AutoCloseable {
if (ip != null) responseBuilder.ipAddress(InetAddress.getByName(ip)); if (ip != null) responseBuilder.ipAddress(InetAddress.getByName(ip));
responseBuilder.payloadDigest(payloadDigestBuilder.build()); responseBuilder.payloadDigest(payloadDigestBuilder.build());
responseBuilder.truncated(inputBuffer.truncationReason());
if (truncationReason != null)
responseBuilder.truncated(truncationReason);
// Build and write the response // Build and write the response
@ -178,12 +154,13 @@ public class WarcRecorder implements AutoCloseable {
.body(MediaType.HTTP_REQUEST, httpRequestString.getBytes()) .body(MediaType.HTTP_REQUEST, httpRequestString.getBytes())
.concurrentTo(warcResponse.id()) .concurrentTo(warcResponse.id())
.build(); .build();
warcRequest.http(); // force HTTP header to be parsed before body is consumed so that caller can use it warcRequest.http(); // force HTTP header to be parsed before body is consumed so that caller can use it
writer.write(warcRequest); writer.write(warcRequest);
return new HttpFetchResult.ResultOk(responseUri, return new HttpFetchResult.ResultOk(responseUri,
response.code(), response.code(),
response.headers(), inputBuffer.headers(),
ip, ip,
responseDataBuffer.data, responseDataBuffer.data,
dataStart, dataStart,
@ -217,7 +194,6 @@ public class WarcRecorder implements AutoCloseable {
fakeHeadersBuilder.add(STR."Content-Type: \{contentType}"); fakeHeadersBuilder.add(STR."Content-Type: \{contentType}");
fakeHeadersBuilder.add(STR."Content-Length: \{bytes.length}"); fakeHeadersBuilder.add(STR."Content-Length: \{bytes.length}");
fakeHeadersBuilder.add(STR."Content-Encoding: UTF-8");
if (contentTags.etag() != null) { if (contentTags.etag() != null) {
fakeHeadersBuilder.add(STR."ETag: \{contentTags.etag()}"); fakeHeadersBuilder.add(STR."ETag: \{contentTags.etag()}");
} }
@ -226,7 +202,7 @@ public class WarcRecorder implements AutoCloseable {
} }
String header = WarcProtocolReconstructor.getResponseHeader(fakeHeadersBuilder.toString(), statusCode); String header = WarcProtocolReconstructor.getResponseHeader(fakeHeadersBuilder.toString(), statusCode);
ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer(); ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer(bytes.length + header.length());
responseDataBuffer.put(header); responseDataBuffer.put(header);
responseDigestBuilder.update(header); responseDigestBuilder.update(header);
@ -348,8 +324,8 @@ public class WarcRecorder implements AutoCloseable {
private int length = 0; private int length = 0;
private int pos = 0; private int pos = 0;
public ResponseDataBuffer() { public ResponseDataBuffer(int size) {
data = bufferThreadLocal.get(); data = new byte[size];
} }
public int pos() { public int pos() {
@ -380,7 +356,7 @@ public class WarcRecorder implements AutoCloseable {
} }
public int remaining() { public int remaining() {
return MAX_SIZE - pos; return data.length - pos;
} }
public void updateDigest(WarcDigestBuilder digestBuilder, int startPos, int n) { public void updateDigest(WarcDigestBuilder digestBuilder, int startPos, int n) {
@ -388,9 +364,10 @@ public class WarcRecorder implements AutoCloseable {
} }
public byte[] copyBytes() { public byte[] copyBytes() {
byte[] copy = new byte[length]; if (length < data.length)
System.arraycopy(data, 0, copy, 0, length); return Arrays.copyOf(data, length);
return copy; else
return data;
} }
} }
@ -405,3 +382,4 @@ public class WarcRecorder implements AutoCloseable {
} }
} }
} }