From 67aa20ea2c3c9a2fbaab86142d1ee2501fc090f8 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Mon, 26 Feb 2024 10:32:49 +0100 Subject: [PATCH] (array) Attempting to debug strange errors --- .../ReverseIndexBTreeTransformer.java | 2 +- .../index/construction/ReversePreindex.java | 37 ++-- .../ReversePreindexWordSegments.java | 13 +- .../array/page/SegmentLongArray.java | 6 +- .../array/page/UnsafeLongArray.java | 173 ++++++++++++++---- 5 files changed, 169 insertions(+), 62 deletions(-) diff --git a/code/index/index-reverse/java/nu/marginalia/index/construction/ReverseIndexBTreeTransformer.java b/code/index/index-reverse/java/nu/marginalia/index/construction/ReverseIndexBTreeTransformer.java index 2f5c05f4..4ace48a9 100644 --- a/code/index/index-reverse/java/nu/marginalia/index/construction/ReverseIndexBTreeTransformer.java +++ b/code/index/index-reverse/java/nu/marginalia/index/construction/ReverseIndexBTreeTransformer.java @@ -30,7 +30,7 @@ public class ReverseIndexBTreeTransformer implements LongIOTransformer { @Override public long transform(long pos, long end) throws IOException { - final int size = (int) (end - start) / entrySize; + final int size = (int) ((end - start) / entrySize); if (size == 0) { return -1; diff --git a/code/index/index-reverse/java/nu/marginalia/index/construction/ReversePreindex.java b/code/index/index-reverse/java/nu/marginalia/index/construction/ReversePreindex.java index 22fc1431..ac39e817 100644 --- a/code/index/index-reverse/java/nu/marginalia/index/construction/ReversePreindex.java +++ b/code/index/index-reverse/java/nu/marginalia/index/construction/ReversePreindex.java @@ -92,7 +92,8 @@ public class ReversePreindex { LongArray wordIds = segments.wordIds; - assert offsets.size() == wordIds.size() : "Offsets and word-ids of different size"; + if (offsets.size() != wordIds.size()) + throw new IllegalStateException("Offsets and word-ids of different size"); if (offsets.size() > Integer.MAX_VALUE) { throw new IllegalStateException("offsets.size() too big!"); } @@ -137,7 +138,7 @@ public class ReversePreindex { Path docsFile = Files.createTempFile(destDir, "docs", ".dat"); - LongArray mergedDocuments = LongArrayFactory.mmapForWritingConfined(docsFile, 2 * (left.documents.size() + right.documents.size())); + LongArray mergedDocuments = LongArrayFactory.mmapForWritingConfined(docsFile, left.documents.size() + right.documents.size()); leftIter.next(); rightIter.next(); @@ -180,9 +181,15 @@ public class ReversePreindex { } - assert !leftIter.isPositionBeforeEnd() : "Left has more to go"; - assert !rightIter.isPositionBeforeEnd() : "Right has more to go"; - assert !mergingIter.canPutMore() : "Source iters ran dry before merging iter"; + if (leftIter.isPositionBeforeEnd()) + throw new IllegalStateException("Left has more to go"); + if (rightIter.isPositionBeforeEnd()) + throw new IllegalStateException("Right has more to go"); + if (mergingIter.canPutMore()) + throw new IllegalStateException("Source iters ran dry before merging iter"); + + + mergingSegment.force(); // We may have overestimated the size of the merged docs size in the case there were // duplicates in the data, so we need to shrink it to the actual size we wrote. @@ -190,8 +197,6 @@ public class ReversePreindex { mergedDocuments = shrinkMergedDocuments(mergedDocuments, docsFile, 2 * mergingSegment.totalSize()); - mergingSegment.force(); - return new ReversePreindex( mergingSegment, new ReversePreindexDocuments(mergedDocuments, docsFile) @@ -233,16 +238,15 @@ public class ReversePreindex { mergedDocuments.force(); long beforeSize = mergedDocuments.size(); - try (var bc = Files.newByteChannel(docsFile, StandardOpenOption.WRITE)) { - bc.truncate(sizeLongs * 8); - } - long afterSize = mergedDocuments.size(); - mergedDocuments.close(); - - mergedDocuments = LongArrayFactory.mmapForWritingConfined(docsFile, sizeLongs); - + long afterSize = sizeLongs * 8; if (beforeSize != afterSize) { + mergedDocuments.close(); + try (var bc = Files.newByteChannel(docsFile, StandardOpenOption.WRITE)) { + bc.truncate(sizeLongs * 8); + } + logger.info("Shrunk {} from {}b to {}b", docsFile, beforeSize, afterSize); + mergedDocuments = LongArrayFactory.mmapForWritingConfined(docsFile, sizeLongs); } return mergedDocuments; @@ -291,7 +295,8 @@ public class ReversePreindex { boolean putNext = mergingIter.putNext(size / 2); boolean iterNext = sourceIter.next(); - assert putNext || !iterNext : "Source iterator ran out before dest iterator?!"; + if (!putNext && iterNext) + throw new IllegalStateException("Source iterator ran out before dest iterator?!"); return iterNext; } diff --git a/code/index/index-reverse/java/nu/marginalia/index/construction/ReversePreindexWordSegments.java b/code/index/index-reverse/java/nu/marginalia/index/construction/ReversePreindexWordSegments.java index bcfe486e..04303210 100644 --- a/code/index/index-reverse/java/nu/marginalia/index/construction/ReversePreindexWordSegments.java +++ b/code/index/index-reverse/java/nu/marginalia/index/construction/ReversePreindexWordSegments.java @@ -38,6 +38,9 @@ public class ReversePreindexWordSegments { * and each value is the start offset of the data. */ public Long2LongOpenHashMap asMap(int recordSize) { + if (wordIds.size() > Integer.MAX_VALUE) + throw new IllegalArgumentException("Cannot create a map with more than Integer.MAX_VALUE entries"); + Long2LongOpenHashMap ret = new Long2LongOpenHashMap((int) wordIds.size(), 0.75f); var iter = iterator(recordSize); @@ -62,7 +65,7 @@ public class ReversePreindexWordSegments { // Create the words file by iterating over the map and inserting them into // the words file in whatever bizarro hash table order they appear in - int i = 0; + long i = 0; LongIterator iter = countsMap.keySet().iterator(); while (iter.hasNext()) { words.set(i++, iter.nextLong()); @@ -120,8 +123,8 @@ public class ReversePreindexWordSegments { this.fileSize = wordIds.size(); } - private int i = -1; - public int idx() { + private long i = -1; + public long idx() { return i; } public boolean next() { @@ -166,8 +169,8 @@ public class ReversePreindexWordSegments { this.wordId = wordIds.get(0); } - private int i = 0; - public int idx() { + private long i = 0; + public long idx() { return i; } diff --git a/code/libraries/array/java/nu/marginalia/array/page/SegmentLongArray.java b/code/libraries/array/java/nu/marginalia/array/page/SegmentLongArray.java index 21044b68..5e3b8a59 100644 --- a/code/libraries/array/java/nu/marginalia/array/page/SegmentLongArray.java +++ b/code/libraries/array/java/nu/marginalia/array/page/SegmentLongArray.java @@ -154,7 +154,11 @@ public class SegmentLongArray implements PartitionPage, LongArray { @Override public void transferFrom(FileChannel source, long sourceStart, long arrayStart, long arrayEnd) throws IOException { - final int stride = 1024*1204*128; // Copy 1 GB at a time 'cause byte buffers are 'a byte buffering + final int stride = 1024*1024*128; // Copy 1 GB at a time 'cause byte buffers are 'a byte buffering + + if (source.size() / 8 < sourceStart + (arrayEnd - arrayStart)) { + throw new IndexOutOfBoundsException(STR."Source channel too small: \{source.size()} < \{sourceStart + (arrayEnd - arrayStart)}"); + } long ss = sourceStart; for (long as = arrayStart; as < arrayEnd; as += stride, ss += stride) { diff --git a/code/libraries/array/java/nu/marginalia/array/page/UnsafeLongArray.java b/code/libraries/array/java/nu/marginalia/array/page/UnsafeLongArray.java index 8ba7182d..4ef0da02 100644 --- a/code/libraries/array/java/nu/marginalia/array/page/UnsafeLongArray.java +++ b/code/libraries/array/java/nu/marginalia/array/page/UnsafeLongArray.java @@ -2,8 +2,11 @@ package nu.marginalia.array.page; import nu.marginalia.array.ArrayRangeReference; import nu.marginalia.array.LongArray; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import sun.misc.Unsafe; +import javax.annotation.Nonnull; import javax.annotation.Nullable; import java.io.IOException; import java.lang.foreign.Arena; @@ -12,7 +15,6 @@ import java.nio.ByteBuffer; import java.nio.LongBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; -import java.nio.file.OpenOption; import java.nio.file.Path; import java.nio.file.StandardOpenOption; @@ -23,9 +25,13 @@ import static java.lang.foreign.ValueLayout.JAVA_LONG; public class UnsafeLongArray implements PartitionPage, LongArray { private static final Unsafe unsafe = UnsafeProvider.getUnsafe(); + private static final Logger logger = LoggerFactory.getLogger(UnsafeLongArray.class); @Nullable private final Arena arena; + @Nullable + private final FileChannel channel; + private final MemorySegment segment; private boolean closed; @@ -33,6 +39,15 @@ public class UnsafeLongArray implements PartitionPage, LongArray { @Nullable Arena arena) { this.segment = segment; this.arena = arena; + this.channel = null; + } + + UnsafeLongArray(MemorySegment segment, + @Nonnull FileChannel channel, + @Nullable Arena arena) { + this.segment = segment; + this.arena = arena; + this.channel = channel; } public static UnsafeLongArray onHeap(Arena arena, long size) { @@ -40,38 +55,26 @@ public class UnsafeLongArray implements PartitionPage, LongArray { } public static UnsafeLongArray fromMmapReadOnly(Arena arena, Path file, long offset, long size) throws IOException { - return new UnsafeLongArray( - mmapFile(arena, file, offset, size, FileChannel.MapMode.READ_ONLY, StandardOpenOption.READ), - arena); - } - - public static UnsafeLongArray fromMmapReadWrite(Arena arena, Path file, long offset, long size) throws IOException { - - return new UnsafeLongArray( - mmapFile(arena, file, offset, size, FileChannel.MapMode.READ_WRITE, - StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE), - arena); - } - - private static MemorySegment mmapFile(Arena arena, - Path file, - long offset, - long size, - FileChannel.MapMode mode, - OpenOption... openOptions) throws IOException - { - try (var channel = (FileChannel) Files.newByteChannel(file, openOptions)) { - - return channel.map(mode, - JAVA_LONG.byteSize() * offset, - JAVA_LONG.byteSize() * size, - arena); + try (var channel = (FileChannel) Files.newByteChannel(file, StandardOpenOption.READ)) { + return new UnsafeLongArray(channel.map(FileChannel.MapMode.READ_ONLY, + JAVA_LONG.byteSize() * offset, JAVA_LONG.byteSize() * size, + arena), arena); } catch (IOException ex) { throw new IOException("Failed to map file " + file + " (" + offset + ":" + size + ")", ex); } } + public static UnsafeLongArray fromMmapReadWrite(Arena arena, Path file, long offset, long size) throws IOException { + var channel = (FileChannel) Files.newByteChannel(file, + StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE); + var segment = channel.map(FileChannel.MapMode.READ_WRITE, + JAVA_LONG.byteSize() * offset, JAVA_LONG.byteSize() * size, + arena); + + return new UnsafeLongArray(segment, channel, arena); + } + @Override public LongArray range(long start, long end) { return new UnsafeLongArray( @@ -122,6 +125,15 @@ public class UnsafeLongArray implements PartitionPage, LongArray { if (arena != null && !closed) { arena.close(); } + if (channel != null && !closed) { + try { + channel.close(); + } + catch (IOException ex) { + throw new RuntimeException("Failed to close channel", ex); + } + } + closed = true; } @@ -149,6 +161,13 @@ public class UnsafeLongArray implements PartitionPage, LongArray { public void force() { if (segment.isMapped()) { segment.force(); + try { + if (channel != null) { + channel.force(false); + } + } catch (IOException e) { + throw new RuntimeException("Failed to force channel", e); + } } } @@ -156,26 +175,102 @@ public class UnsafeLongArray implements PartitionPage, LongArray { return new ArrayRangeReference<>(this, start, end); } - @Override - public void transferFrom(FileChannel source, long sourceStart, long arrayStart, long arrayEnd) throws IOException { + public void chanelChannelTransfer(FileChannel source, + long sourceStartL, + long arrayStartL, + long arrayEndL) throws IOException { - final int stride = 1024*1204*128; // Copy 1 GB at a time 'cause byte buffers are 'a byte buffering + assert channel != null; - long ss = sourceStart; - for (long as = arrayStart; as < arrayEnd; as += stride, ss += stride) { - long ae = Math.min(as + stride, arrayEnd); + final int B_per_L = (int) JAVA_LONG.byteSize(); - long index = as * JAVA_LONG.byteSize(); - long length = (ae - as) * JAVA_LONG.byteSize(); + final int strideB = 128*1024*1024; // Copy in 128 MB chunks - var bufferSlice = segment.asSlice(index, length).asByteBuffer(); + final long destStartB = arrayStartL * B_per_L; + final long destEndB = arrayEndL * B_per_L; + final long lengthB = destEndB - destStartB; - long startPos = ss * JAVA_LONG.byteSize(); - while (bufferSlice.position() < bufferSlice.capacity()) { - source.read(bufferSlice, startPos + bufferSlice.position()); + final long sourceStartB = sourceStartL * B_per_L; + final long sourceEndB = sourceStartB + lengthB; + + + if (sourceStartB > sourceEndB) + throw new IndexOutOfBoundsException("Source start after end"); + if (sourceStartB > source.size()) + throw new IndexOutOfBoundsException("Source channel too small, start " + sourceStartB + " < input size " + source.size()); + if (sourceEndB > source.size()) + throw new IndexOutOfBoundsException("Source channel too small, end " + sourceEndB + " < input size " + source.size()); + + long destIndexB = destStartB; + + source.position(sourceStartB); + + while (destIndexB < destEndB) + { + long stepSizeB = Math.min(destIndexB + strideB, destEndB); + long copyLengthB = (stepSizeB - destIndexB); + + long transferred = channel.transferFrom(source, destIndexB, copyLengthB); + if (transferred != copyLengthB) { + logger.warn("Less than {} bytes were copied: {}", copyLengthB, transferred); } + + destIndexB += copyLengthB; + } + } + + @Override + public void transferFrom(FileChannel source, + long sourceStartL, + long arrayStartL, + long arrayEndL) throws IOException { + + + if (channel != null) { + chanelChannelTransfer(source, sourceStartL, arrayStartL, arrayEndL); + return; } + final int B_per_L = (int) JAVA_LONG.byteSize(); + + final int strideB = 1024*1024*1024; // Copy 1 GB at a time + + final long arrayStartB = arrayStartL * B_per_L; + final long arrayEndB = arrayEndL * B_per_L; + final long arrayLengthB = arrayEndB - arrayStartB; + + final long sourceStartB = sourceStartL * B_per_L; + final long sourceEndB = sourceStartB + arrayLengthB; + + + if (sourceStartB > sourceEndB) + throw new IndexOutOfBoundsException("Source start after end"); + if (sourceStartB > source.size()) + throw new IndexOutOfBoundsException("Source channel too small, start " + sourceStartB + " < input size " + source.size()); + if (sourceEndB > source.size()) + throw new IndexOutOfBoundsException("Source channel too small, end " + sourceEndB + " < input size " + source.size()); + + long channelIndexB = sourceStartB; + long segmentIndexB = arrayStartB; + + while (segmentIndexB < arrayEndB) + { + long segmentEndB = Math.min(segmentIndexB + strideB, arrayEndB); + long lengthB = (segmentEndB - segmentIndexB); + + var bufferSlice = segment.asSlice(segmentIndexB, lengthB).asByteBuffer(); + + while (bufferSlice.position() < bufferSlice.capacity()) { + if (source.position() + bufferSlice.capacity() > sourceEndB) + throw new IndexOutOfBoundsException("Source channel too small"); + + if (source.read(bufferSlice, channelIndexB + bufferSlice.position()) < 0) + throw new IOException("Failed to read from source"); + } + + channelIndexB += lengthB; + segmentIndexB += lengthB; + } } }