diff --git a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java index 019aa761..7c8f471c 100644 --- a/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java +++ b/code/process-models/crawling-model/src/main/java/nu/marginalia/crawling/body/DocumentBodyExtractor.java @@ -4,12 +4,9 @@ import nu.marginalia.contenttype.ContentType; import nu.marginalia.contenttype.ContentTypeParser; import nu.marginalia.contenttype.DocumentBodyToString; import nu.marginalia.crawling.model.CrawlerDocumentStatus; -import org.apache.commons.io.input.BOMInputStream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.zip.GZIPInputStream; - public class DocumentBodyExtractor { private static final ContentTypeLogic contentTypeLogic = new ContentTypeLogic(); @@ -55,12 +52,6 @@ public class DocumentBodyExtractor { public static DocumentBodyResult asBytes(HttpFetchResult.ResultOk rsp) { try { var byteStream = rsp.getInputStream(); - - if ("gzip".equals(rsp.header("Content-Encoding"))) { - byteStream = new GZIPInputStream(byteStream); - } - byteStream = new BOMInputStream(byteStream); - var contentTypeHeader = rsp.header("Content-Type"); byte[] data = byteStream.readAllBytes(); // size is limited by WarcRecorder diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcInputBuffer.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcInputBuffer.java new file mode 100644 index 00000000..d5864071 --- /dev/null +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcInputBuffer.java @@ -0,0 +1,224 @@ +package nu.marginalia.crawl.retreival.fetcher.warc; + +import okhttp3.Headers; +import okhttp3.Response; +import org.apache.commons.io.input.BOMInputStream; +import org.netpreserve.jwarc.WarcTruncationReason; + +import java.io.*; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Objects; +import java.util.zip.GZIPInputStream; + +/** Input buffer for temporary storage of a HTTP response + * This may be in-memory or on-disk, at the discretion of + * the implementation. + * */ +public abstract class WarcInputBuffer implements AutoCloseable { + protected WarcTruncationReason truncationReason = WarcTruncationReason.NOT_TRUNCATED; + protected Headers headers; + WarcInputBuffer(Headers headers) { + this.headers = headers; + } + + /** If necessary, the stream is closed when the buffer is closed */ + public abstract InputStream read() throws IOException; + + /** The size of the response */ + public abstract int size(); + + public final WarcTruncationReason truncationReason() { return truncationReason; } + + public final Headers headers() { return headers; } + + /** Create a buffer for a response. + * If the response is small and not compressed, it will be stored in memory. + * Otherwise, it will be stored in a temporary file, with compression transparently handled + * and suppressed from the headers. + * If an error occurs, a buffer will be created with no content and an error status. + */ + static WarcInputBuffer forResponse(Response rsp) { + if (rsp == null) + return new ErrorBuffer(); + + try { + String contentLengthHeader = Objects.requireNonNullElse(rsp.header("Content-Length"), "-1"); + int contentLength = Integer.parseInt(contentLengthHeader); + String contentEncoding = rsp.header("Content-Encoding"); + + if (contentEncoding == null && contentLength > 0 && contentLength < 8192) { + // If the content is small and not compressed, we can just read it into memory + return new MemoryBuffer(rsp, contentLength); + } + else { + // Otherwise, we unpack it into a file and read it from there + return new FileBuffer(rsp); + } + } + catch (Exception ex) { + return new ErrorBuffer(rsp); + } + + } + + /** Copy an input stream to an output stream, with a maximum size and time limit */ + protected void copy(InputStream is, OutputStream os) { + long startTime = System.currentTimeMillis(); + long size = 0; + + byte[] buffer = new byte[8192]; + + // Gobble up the BOM if it's there + is = new BOMInputStream(is); + + while (true) { + try { + int n = is.read(buffer); + if (n < 0) break; + size += n; + os.write(buffer, 0, n); + + if (size > WarcRecorder.MAX_SIZE) { + truncationReason = WarcTruncationReason.LENGTH; + break; + } + + if (System.currentTimeMillis() - startTime > WarcRecorder.MAX_TIME) { + truncationReason = WarcTruncationReason.TIME; + break; + } + } catch (IOException e) { + throw new RuntimeException(e); + } + } + } + +} + +/** Pseudo-buffer for when we have an error */ +class ErrorBuffer extends WarcInputBuffer { + public ErrorBuffer() { + super(Headers.of()); + truncationReason = WarcTruncationReason.UNSPECIFIED; + } + + public ErrorBuffer(Response rsp) { + super(rsp.headers()); + truncationReason = WarcTruncationReason.UNSPECIFIED; + } + + @Override + public InputStream read() throws IOException { + return ByteArrayInputStream.nullInputStream(); + } + + @Override + public int size() { + return 0; + } + + @Override + public void close() throws Exception {} +} + +/** Buffer for when we have the response in memory */ +class MemoryBuffer extends WarcInputBuffer { + byte[] data; + public MemoryBuffer(Response response, int size) { + super(response.headers()); + + var outputStream = new ByteArrayOutputStream(size); + + copy(response.body().byteStream(), outputStream); + + data = outputStream.toByteArray(); + } + @Override + public InputStream read() throws IOException { + return new ByteArrayInputStream(data); + } + + @Override + public int size() { + return data.length; + } + + @Override + public void close() throws Exception { + + } +} + +/** Buffer for when we have the response in a file */ +class FileBuffer extends WarcInputBuffer { + private final Path tempFile; + + public FileBuffer(Response response) throws IOException { + super(suppressContentEncoding(response.headers())); + + this.tempFile = Files.createTempFile("rsp", ".html"); + + if (response.body() == null) { + truncationReason = WarcTruncationReason.DISCONNECT; + return; + } + + if ("gzip".equals(response.header("Content-Encoding"))) { + try (var out = Files.newOutputStream(tempFile)) { + copy(new GZIPInputStream(response.body().byteStream()), out); + } + catch (Exception ex) { + truncationReason = WarcTruncationReason.UNSPECIFIED; + } + } + else { + try (var out = Files.newOutputStream(tempFile)) { + copy(response.body().byteStream(), out); + } + catch (Exception ex) { + truncationReason = WarcTruncationReason.UNSPECIFIED; + } + } + } + + private static Headers suppressContentEncoding(Headers headers) { + var builder = new Headers.Builder(); + + headers.toMultimap().forEach((k, values) -> { + if ("Content-Encoding".equalsIgnoreCase(k)) { + return; + } + if ("Transfer-Encoding".equalsIgnoreCase(k)) { + return; + } + for (var value : values) { + builder.add(k, value); + } + }); + + return builder.build(); + } + + + public InputStream read() throws IOException { + return Files.newInputStream(tempFile); + } + + public int size() { + try { + long fileSize = Files.size(tempFile); + if (fileSize > Integer.MAX_VALUE) { + throw new IllegalStateException("File too large"); + } + return (int) fileSize; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() throws Exception { + Files.deleteIfExists(tempFile); + } +} \ No newline at end of file diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java index 40d98d73..6f977e44 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcProtocolReconstructor.java @@ -64,13 +64,13 @@ public class WarcProtocolReconstructor { return STR."HTTP/\{version} \{statusCode} \{statusMessage}\r\n\{headerString}\r\n\r\n"; } - static String getResponseHeader(Response response) { + static String getResponseHeader(Response response, long size) { String version = response.protocol() == Protocol.HTTP_1_1 ? "1.1" : "2.0"; String statusCode = String.valueOf(response.code()); String statusMessage = STATUS_CODE_MAP.getOrDefault(response.code(), "Unknown"); - String headerString = getHeadersAsString(response); + String headerString = getHeadersAsString(response, size); return STR."HTTP/\{version} \{statusCode} \{statusMessage}\r\n\{headerString}\r\n\r\n"; } @@ -137,7 +137,7 @@ public class WarcProtocolReconstructor { return joiner.toString(); } - static private String getHeadersAsString(Response response) { + static private String getHeadersAsString(Response response, long responseSize) { StringJoiner joiner = new StringJoiner("\r\n"); response.headers().toMultimap().forEach((k, values) -> { @@ -147,15 +147,24 @@ public class WarcProtocolReconstructor { if (headerCapitalized.startsWith("X-Marginalia")) return; - // Omit Transfer-Encoding header, as we'll be using Content-Length - // instead in the warc file, despite what the server says - if (headerCapitalized.startsWith("Transfer-Encoding")) + // Omit Transfer-Encoding and Content-Encoding headers + if (headerCapitalized.equals("Transfer-Encoding")) + return; + if (headerCapitalized.equals("Content-Encoding")) + return; + + // Since we're transparently decoding gzip, we need to update the Content-Length header + // to reflect the actual size of the response body. We'll do this at the end. + if (headerCapitalized.equals("Content-Length")) return; for (var value : values) { joiner.add(headerCapitalized + ": " + value); } }); + + joiner.add("Content-Length: " + responseSize); + return joiner.toString(); } diff --git a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java index aa9837cf..6326104a 100644 --- a/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java +++ b/code/processes/crawling-process/src/main/java/nu/marginalia/crawl/retreival/fetcher/warc/WarcRecorder.java @@ -30,14 +30,15 @@ import java.util.*; * be reconstructed. */ public class WarcRecorder implements AutoCloseable { - private static final int MAX_TIME = 30_000; - private static final int MAX_SIZE = 1024 * 1024 * 10; + /** Maximum time we'll wait on a single request */ + static final int MAX_TIME = 30_000; + /** Maximum (decompressed) size we'll fetch */ + static final int MAX_SIZE = 1024 * 1024 * 10; + private final WarcWriter writer; private final Path warcFile; private static final Logger logger = LoggerFactory.getLogger(WarcRecorder.class); - private final static ThreadLocal bufferThreadLocal = ThreadLocal.withInitial(() -> new byte[MAX_SIZE]); - private boolean temporaryFile = false; // Affix a version string in case we need to change the format in the future @@ -82,40 +83,27 @@ public class WarcRecorder implements AutoCloseable { String ip; Instant date = Instant.now(); - long startMillis = date.toEpochMilli(); var call = client.newCall(request); - int totalLength = 0; - - WarcTruncationReason truncationReason = null; - - ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer(); - cookieInformation.update(client, request.url()); - try (var response = call.execute()) { - var body = response.body(); - InputStream inputStream; + try (var response = call.execute(); + WarcInputBuffer inputBuffer = WarcInputBuffer.forResponse(response)) + { + String responseHeaders = WarcProtocolReconstructor.getResponseHeader(response, inputBuffer.size()); - if (body == null) { - inputStream = null; - truncationReason = WarcTruncationReason.DISCONNECT; - } - else { - inputStream = body.byteStream(); - } + ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer(inputBuffer.size() + responseHeaders.length()); + InputStream inputStream = inputBuffer.read(); ip = IpInterceptingNetworkInterceptor.getIpFromResponse(response); - String responseHeaders = WarcProtocolReconstructor.getResponseHeader(response); - responseDataBuffer.put(responseHeaders); responseDataBuffer.updateDigest(responseDigestBuilder, 0, responseDataBuffer.length()); int dataStart = responseDataBuffer.pos(); - while (inputStream != null) { + for (;;) { int remainingLength = responseDataBuffer.remaining(); if (remainingLength == 0) break; @@ -128,16 +116,6 @@ public class WarcRecorder implements AutoCloseable { responseDataBuffer.updateDigest(responseDigestBuilder, startPos, n); responseDataBuffer.updateDigest(payloadDigestBuilder, startPos, n); - totalLength += n; - - if (MAX_TIME > 0 && System.currentTimeMillis() - startMillis > MAX_TIME) { - truncationReason = WarcTruncationReason.TIME; - break; - } - if (MAX_SIZE > 0 && totalLength >= MAX_SIZE) { - truncationReason = WarcTruncationReason.LENGTH; - break; - } } // It looks like this might be the same as requestUri, but it's not; @@ -154,9 +132,7 @@ public class WarcRecorder implements AutoCloseable { if (ip != null) responseBuilder.ipAddress(InetAddress.getByName(ip)); responseBuilder.payloadDigest(payloadDigestBuilder.build()); - - if (truncationReason != null) - responseBuilder.truncated(truncationReason); + responseBuilder.truncated(inputBuffer.truncationReason()); // Build and write the response @@ -178,12 +154,13 @@ public class WarcRecorder implements AutoCloseable { .body(MediaType.HTTP_REQUEST, httpRequestString.getBytes()) .concurrentTo(warcResponse.id()) .build(); + warcRequest.http(); // force HTTP header to be parsed before body is consumed so that caller can use it writer.write(warcRequest); return new HttpFetchResult.ResultOk(responseUri, response.code(), - response.headers(), + inputBuffer.headers(), ip, responseDataBuffer.data, dataStart, @@ -217,7 +194,6 @@ public class WarcRecorder implements AutoCloseable { fakeHeadersBuilder.add(STR."Content-Type: \{contentType}"); fakeHeadersBuilder.add(STR."Content-Length: \{bytes.length}"); - fakeHeadersBuilder.add(STR."Content-Encoding: UTF-8"); if (contentTags.etag() != null) { fakeHeadersBuilder.add(STR."ETag: \{contentTags.etag()}"); } @@ -226,7 +202,7 @@ public class WarcRecorder implements AutoCloseable { } String header = WarcProtocolReconstructor.getResponseHeader(fakeHeadersBuilder.toString(), statusCode); - ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer(); + ResponseDataBuffer responseDataBuffer = new ResponseDataBuffer(bytes.length + header.length()); responseDataBuffer.put(header); responseDigestBuilder.update(header); @@ -348,8 +324,8 @@ public class WarcRecorder implements AutoCloseable { private int length = 0; private int pos = 0; - public ResponseDataBuffer() { - data = bufferThreadLocal.get(); + public ResponseDataBuffer(int size) { + data = new byte[size]; } public int pos() { @@ -380,7 +356,7 @@ public class WarcRecorder implements AutoCloseable { } public int remaining() { - return MAX_SIZE - pos; + return data.length - pos; } public void updateDigest(WarcDigestBuilder digestBuilder, int startPos, int n) { @@ -388,9 +364,10 @@ public class WarcRecorder implements AutoCloseable { } public byte[] copyBytes() { - byte[] copy = new byte[length]; - System.arraycopy(data, 0, copy, 0, length); - return copy; + if (length < data.length) + return Arrays.copyOf(data, length); + else + return data; } } @@ -405,3 +382,4 @@ public class WarcRecorder implements AutoCloseable { } } } +