(array) Attempting to debug strange errors
This commit is contained in:
parent
5604e9f531
commit
67aa20ea2c
@ -30,7 +30,7 @@ public class ReverseIndexBTreeTransformer implements LongIOTransformer {
|
||||
@Override
|
||||
public long transform(long pos, long end) throws IOException {
|
||||
|
||||
final int size = (int) (end - start) / entrySize;
|
||||
final int size = (int) ((end - start) / entrySize);
|
||||
|
||||
if (size == 0) {
|
||||
return -1;
|
||||
|
@ -92,7 +92,8 @@ public class ReversePreindex {
|
||||
|
||||
LongArray wordIds = segments.wordIds;
|
||||
|
||||
assert offsets.size() == wordIds.size() : "Offsets and word-ids of different size";
|
||||
if (offsets.size() != wordIds.size())
|
||||
throw new IllegalStateException("Offsets and word-ids of different size");
|
||||
if (offsets.size() > Integer.MAX_VALUE) {
|
||||
throw new IllegalStateException("offsets.size() too big!");
|
||||
}
|
||||
@ -137,7 +138,7 @@ public class ReversePreindex {
|
||||
|
||||
Path docsFile = Files.createTempFile(destDir, "docs", ".dat");
|
||||
|
||||
LongArray mergedDocuments = LongArrayFactory.mmapForWritingConfined(docsFile, 2 * (left.documents.size() + right.documents.size()));
|
||||
LongArray mergedDocuments = LongArrayFactory.mmapForWritingConfined(docsFile, left.documents.size() + right.documents.size());
|
||||
|
||||
leftIter.next();
|
||||
rightIter.next();
|
||||
@ -180,9 +181,15 @@ public class ReversePreindex {
|
||||
|
||||
}
|
||||
|
||||
assert !leftIter.isPositionBeforeEnd() : "Left has more to go";
|
||||
assert !rightIter.isPositionBeforeEnd() : "Right has more to go";
|
||||
assert !mergingIter.canPutMore() : "Source iters ran dry before merging iter";
|
||||
if (leftIter.isPositionBeforeEnd())
|
||||
throw new IllegalStateException("Left has more to go");
|
||||
if (rightIter.isPositionBeforeEnd())
|
||||
throw new IllegalStateException("Right has more to go");
|
||||
if (mergingIter.canPutMore())
|
||||
throw new IllegalStateException("Source iters ran dry before merging iter");
|
||||
|
||||
|
||||
mergingSegment.force();
|
||||
|
||||
// We may have overestimated the size of the merged docs size in the case there were
|
||||
// duplicates in the data, so we need to shrink it to the actual size we wrote.
|
||||
@ -190,8 +197,6 @@ public class ReversePreindex {
|
||||
mergedDocuments = shrinkMergedDocuments(mergedDocuments,
|
||||
docsFile, 2 * mergingSegment.totalSize());
|
||||
|
||||
mergingSegment.force();
|
||||
|
||||
return new ReversePreindex(
|
||||
mergingSegment,
|
||||
new ReversePreindexDocuments(mergedDocuments, docsFile)
|
||||
@ -233,16 +238,15 @@ public class ReversePreindex {
|
||||
mergedDocuments.force();
|
||||
|
||||
long beforeSize = mergedDocuments.size();
|
||||
try (var bc = Files.newByteChannel(docsFile, StandardOpenOption.WRITE)) {
|
||||
bc.truncate(sizeLongs * 8);
|
||||
}
|
||||
long afterSize = mergedDocuments.size();
|
||||
mergedDocuments.close();
|
||||
|
||||
mergedDocuments = LongArrayFactory.mmapForWritingConfined(docsFile, sizeLongs);
|
||||
|
||||
long afterSize = sizeLongs * 8;
|
||||
if (beforeSize != afterSize) {
|
||||
mergedDocuments.close();
|
||||
try (var bc = Files.newByteChannel(docsFile, StandardOpenOption.WRITE)) {
|
||||
bc.truncate(sizeLongs * 8);
|
||||
}
|
||||
|
||||
logger.info("Shrunk {} from {}b to {}b", docsFile, beforeSize, afterSize);
|
||||
mergedDocuments = LongArrayFactory.mmapForWritingConfined(docsFile, sizeLongs);
|
||||
}
|
||||
|
||||
return mergedDocuments;
|
||||
@ -291,7 +295,8 @@ public class ReversePreindex {
|
||||
boolean putNext = mergingIter.putNext(size / 2);
|
||||
boolean iterNext = sourceIter.next();
|
||||
|
||||
assert putNext || !iterNext : "Source iterator ran out before dest iterator?!";
|
||||
if (!putNext && iterNext)
|
||||
throw new IllegalStateException("Source iterator ran out before dest iterator?!");
|
||||
|
||||
return iterNext;
|
||||
}
|
||||
|
@ -38,6 +38,9 @@ public class ReversePreindexWordSegments {
|
||||
* and each value is the start offset of the data.
|
||||
*/
|
||||
public Long2LongOpenHashMap asMap(int recordSize) {
|
||||
if (wordIds.size() > Integer.MAX_VALUE)
|
||||
throw new IllegalArgumentException("Cannot create a map with more than Integer.MAX_VALUE entries");
|
||||
|
||||
Long2LongOpenHashMap ret = new Long2LongOpenHashMap((int) wordIds.size(), 0.75f);
|
||||
var iter = iterator(recordSize);
|
||||
|
||||
@ -62,7 +65,7 @@ public class ReversePreindexWordSegments {
|
||||
|
||||
// Create the words file by iterating over the map and inserting them into
|
||||
// the words file in whatever bizarro hash table order they appear in
|
||||
int i = 0;
|
||||
long i = 0;
|
||||
LongIterator iter = countsMap.keySet().iterator();
|
||||
while (iter.hasNext()) {
|
||||
words.set(i++, iter.nextLong());
|
||||
@ -120,8 +123,8 @@ public class ReversePreindexWordSegments {
|
||||
this.fileSize = wordIds.size();
|
||||
}
|
||||
|
||||
private int i = -1;
|
||||
public int idx() {
|
||||
private long i = -1;
|
||||
public long idx() {
|
||||
return i;
|
||||
}
|
||||
public boolean next() {
|
||||
@ -166,8 +169,8 @@ public class ReversePreindexWordSegments {
|
||||
this.wordId = wordIds.get(0);
|
||||
}
|
||||
|
||||
private int i = 0;
|
||||
public int idx() {
|
||||
private long i = 0;
|
||||
public long idx() {
|
||||
return i;
|
||||
}
|
||||
|
||||
|
@ -154,7 +154,11 @@ public class SegmentLongArray implements PartitionPage, LongArray {
|
||||
@Override
|
||||
public void transferFrom(FileChannel source, long sourceStart, long arrayStart, long arrayEnd) throws IOException {
|
||||
|
||||
final int stride = 1024*1204*128; // Copy 1 GB at a time 'cause byte buffers are 'a byte buffering
|
||||
final int stride = 1024*1024*128; // Copy 1 GB at a time 'cause byte buffers are 'a byte buffering
|
||||
|
||||
if (source.size() / 8 < sourceStart + (arrayEnd - arrayStart)) {
|
||||
throw new IndexOutOfBoundsException(STR."Source channel too small: \{source.size()} < \{sourceStart + (arrayEnd - arrayStart)}");
|
||||
}
|
||||
|
||||
long ss = sourceStart;
|
||||
for (long as = arrayStart; as < arrayEnd; as += stride, ss += stride) {
|
||||
|
@ -2,8 +2,11 @@ package nu.marginalia.array.page;
|
||||
|
||||
import nu.marginalia.array.ArrayRangeReference;
|
||||
import nu.marginalia.array.LongArray;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import sun.misc.Unsafe;
|
||||
|
||||
import javax.annotation.Nonnull;
|
||||
import javax.annotation.Nullable;
|
||||
import java.io.IOException;
|
||||
import java.lang.foreign.Arena;
|
||||
@ -12,7 +15,6 @@ import java.nio.ByteBuffer;
|
||||
import java.nio.LongBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.OpenOption;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
|
||||
@ -23,9 +25,13 @@ import static java.lang.foreign.ValueLayout.JAVA_LONG;
|
||||
public class UnsafeLongArray implements PartitionPage, LongArray {
|
||||
|
||||
private static final Unsafe unsafe = UnsafeProvider.getUnsafe();
|
||||
private static final Logger logger = LoggerFactory.getLogger(UnsafeLongArray.class);
|
||||
|
||||
@Nullable
|
||||
private final Arena arena;
|
||||
@Nullable
|
||||
private final FileChannel channel;
|
||||
|
||||
private final MemorySegment segment;
|
||||
private boolean closed;
|
||||
|
||||
@ -33,6 +39,15 @@ public class UnsafeLongArray implements PartitionPage, LongArray {
|
||||
@Nullable Arena arena) {
|
||||
this.segment = segment;
|
||||
this.arena = arena;
|
||||
this.channel = null;
|
||||
}
|
||||
|
||||
UnsafeLongArray(MemorySegment segment,
|
||||
@Nonnull FileChannel channel,
|
||||
@Nullable Arena arena) {
|
||||
this.segment = segment;
|
||||
this.arena = arena;
|
||||
this.channel = channel;
|
||||
}
|
||||
|
||||
public static UnsafeLongArray onHeap(Arena arena, long size) {
|
||||
@ -40,38 +55,26 @@ public class UnsafeLongArray implements PartitionPage, LongArray {
|
||||
}
|
||||
|
||||
public static UnsafeLongArray fromMmapReadOnly(Arena arena, Path file, long offset, long size) throws IOException {
|
||||
return new UnsafeLongArray(
|
||||
mmapFile(arena, file, offset, size, FileChannel.MapMode.READ_ONLY, StandardOpenOption.READ),
|
||||
arena);
|
||||
}
|
||||
|
||||
public static UnsafeLongArray fromMmapReadWrite(Arena arena, Path file, long offset, long size) throws IOException {
|
||||
|
||||
return new UnsafeLongArray(
|
||||
mmapFile(arena, file, offset, size, FileChannel.MapMode.READ_WRITE,
|
||||
StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE),
|
||||
arena);
|
||||
}
|
||||
|
||||
private static MemorySegment mmapFile(Arena arena,
|
||||
Path file,
|
||||
long offset,
|
||||
long size,
|
||||
FileChannel.MapMode mode,
|
||||
OpenOption... openOptions) throws IOException
|
||||
{
|
||||
try (var channel = (FileChannel) Files.newByteChannel(file, openOptions)) {
|
||||
|
||||
return channel.map(mode,
|
||||
JAVA_LONG.byteSize() * offset,
|
||||
JAVA_LONG.byteSize() * size,
|
||||
arena);
|
||||
try (var channel = (FileChannel) Files.newByteChannel(file, StandardOpenOption.READ)) {
|
||||
return new UnsafeLongArray(channel.map(FileChannel.MapMode.READ_ONLY,
|
||||
JAVA_LONG.byteSize() * offset, JAVA_LONG.byteSize() * size,
|
||||
arena), arena);
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new IOException("Failed to map file " + file + " (" + offset + ":" + size + ")", ex);
|
||||
}
|
||||
}
|
||||
|
||||
public static UnsafeLongArray fromMmapReadWrite(Arena arena, Path file, long offset, long size) throws IOException {
|
||||
var channel = (FileChannel) Files.newByteChannel(file,
|
||||
StandardOpenOption.READ, StandardOpenOption.WRITE, StandardOpenOption.CREATE);
|
||||
var segment = channel.map(FileChannel.MapMode.READ_WRITE,
|
||||
JAVA_LONG.byteSize() * offset, JAVA_LONG.byteSize() * size,
|
||||
arena);
|
||||
|
||||
return new UnsafeLongArray(segment, channel, arena);
|
||||
}
|
||||
|
||||
@Override
|
||||
public LongArray range(long start, long end) {
|
||||
return new UnsafeLongArray(
|
||||
@ -122,6 +125,15 @@ public class UnsafeLongArray implements PartitionPage, LongArray {
|
||||
if (arena != null && !closed) {
|
||||
arena.close();
|
||||
}
|
||||
if (channel != null && !closed) {
|
||||
try {
|
||||
channel.close();
|
||||
}
|
||||
catch (IOException ex) {
|
||||
throw new RuntimeException("Failed to close channel", ex);
|
||||
}
|
||||
}
|
||||
|
||||
closed = true;
|
||||
}
|
||||
|
||||
@ -149,6 +161,13 @@ public class UnsafeLongArray implements PartitionPage, LongArray {
|
||||
public void force() {
|
||||
if (segment.isMapped()) {
|
||||
segment.force();
|
||||
try {
|
||||
if (channel != null) {
|
||||
channel.force(false);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException("Failed to force channel", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -156,26 +175,102 @@ public class UnsafeLongArray implements PartitionPage, LongArray {
|
||||
return new ArrayRangeReference<>(this, start, end);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transferFrom(FileChannel source, long sourceStart, long arrayStart, long arrayEnd) throws IOException {
|
||||
public void chanelChannelTransfer(FileChannel source,
|
||||
long sourceStartL,
|
||||
long arrayStartL,
|
||||
long arrayEndL) throws IOException {
|
||||
|
||||
final int stride = 1024*1204*128; // Copy 1 GB at a time 'cause byte buffers are 'a byte buffering
|
||||
assert channel != null;
|
||||
|
||||
long ss = sourceStart;
|
||||
for (long as = arrayStart; as < arrayEnd; as += stride, ss += stride) {
|
||||
long ae = Math.min(as + stride, arrayEnd);
|
||||
final int B_per_L = (int) JAVA_LONG.byteSize();
|
||||
|
||||
long index = as * JAVA_LONG.byteSize();
|
||||
long length = (ae - as) * JAVA_LONG.byteSize();
|
||||
final int strideB = 128*1024*1024; // Copy in 128 MB chunks
|
||||
|
||||
var bufferSlice = segment.asSlice(index, length).asByteBuffer();
|
||||
final long destStartB = arrayStartL * B_per_L;
|
||||
final long destEndB = arrayEndL * B_per_L;
|
||||
final long lengthB = destEndB - destStartB;
|
||||
|
||||
long startPos = ss * JAVA_LONG.byteSize();
|
||||
while (bufferSlice.position() < bufferSlice.capacity()) {
|
||||
source.read(bufferSlice, startPos + bufferSlice.position());
|
||||
final long sourceStartB = sourceStartL * B_per_L;
|
||||
final long sourceEndB = sourceStartB + lengthB;
|
||||
|
||||
|
||||
if (sourceStartB > sourceEndB)
|
||||
throw new IndexOutOfBoundsException("Source start after end");
|
||||
if (sourceStartB > source.size())
|
||||
throw new IndexOutOfBoundsException("Source channel too small, start " + sourceStartB + " < input size " + source.size());
|
||||
if (sourceEndB > source.size())
|
||||
throw new IndexOutOfBoundsException("Source channel too small, end " + sourceEndB + " < input size " + source.size());
|
||||
|
||||
long destIndexB = destStartB;
|
||||
|
||||
source.position(sourceStartB);
|
||||
|
||||
while (destIndexB < destEndB)
|
||||
{
|
||||
long stepSizeB = Math.min(destIndexB + strideB, destEndB);
|
||||
long copyLengthB = (stepSizeB - destIndexB);
|
||||
|
||||
long transferred = channel.transferFrom(source, destIndexB, copyLengthB);
|
||||
if (transferred != copyLengthB) {
|
||||
logger.warn("Less than {} bytes were copied: {}", copyLengthB, transferred);
|
||||
}
|
||||
|
||||
destIndexB += copyLengthB;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transferFrom(FileChannel source,
|
||||
long sourceStartL,
|
||||
long arrayStartL,
|
||||
long arrayEndL) throws IOException {
|
||||
|
||||
|
||||
if (channel != null) {
|
||||
chanelChannelTransfer(source, sourceStartL, arrayStartL, arrayEndL);
|
||||
return;
|
||||
}
|
||||
|
||||
final int B_per_L = (int) JAVA_LONG.byteSize();
|
||||
|
||||
final int strideB = 1024*1024*1024; // Copy 1 GB at a time
|
||||
|
||||
final long arrayStartB = arrayStartL * B_per_L;
|
||||
final long arrayEndB = arrayEndL * B_per_L;
|
||||
final long arrayLengthB = arrayEndB - arrayStartB;
|
||||
|
||||
final long sourceStartB = sourceStartL * B_per_L;
|
||||
final long sourceEndB = sourceStartB + arrayLengthB;
|
||||
|
||||
|
||||
if (sourceStartB > sourceEndB)
|
||||
throw new IndexOutOfBoundsException("Source start after end");
|
||||
if (sourceStartB > source.size())
|
||||
throw new IndexOutOfBoundsException("Source channel too small, start " + sourceStartB + " < input size " + source.size());
|
||||
if (sourceEndB > source.size())
|
||||
throw new IndexOutOfBoundsException("Source channel too small, end " + sourceEndB + " < input size " + source.size());
|
||||
|
||||
long channelIndexB = sourceStartB;
|
||||
long segmentIndexB = arrayStartB;
|
||||
|
||||
while (segmentIndexB < arrayEndB)
|
||||
{
|
||||
long segmentEndB = Math.min(segmentIndexB + strideB, arrayEndB);
|
||||
long lengthB = (segmentEndB - segmentIndexB);
|
||||
|
||||
var bufferSlice = segment.asSlice(segmentIndexB, lengthB).asByteBuffer();
|
||||
|
||||
while (bufferSlice.position() < bufferSlice.capacity()) {
|
||||
if (source.position() + bufferSlice.capacity() > sourceEndB)
|
||||
throw new IndexOutOfBoundsException("Source channel too small");
|
||||
|
||||
if (source.read(bufferSlice, channelIndexB + bufferSlice.position()) < 0)
|
||||
throw new IOException("Failed to read from source");
|
||||
}
|
||||
|
||||
channelIndexB += lengthB;
|
||||
segmentIndexB += lengthB;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user