(index-construction) Make random-write file strategy configurable
To cope with writing large files out of order, the system needs some form of strategy to avoid writing them directly to disk, as this causes insane amounts of disk thrashing. By default, the data is just buffered in RAM. This works well on a large server, but smaller systems struggle. To help systems with small RAM process large amounts of data, the old RandomWriteFunnel is brought back if the system property 'system.conserve-memory' is set to true. RandomWriteFunnel is buffering the construction by creating a series of small files that pigeonhole the writes into rough neighborhoods, and then it goes over the files one by one to construct one area of the file at a time. This is relatively slow and uses more than twice the disk size. A new interface RandomFileAssembler is introduced as an abstraction for this operation. A third strategy, direct mmaps, is also introduced if the file is very small (less than 1 GB). In this domain, disk thrashing is unlikely since it will comfortably fit in RAM.
This commit is contained in:
parent
6dcc20038c
commit
53c575db3f
@ -41,14 +41,14 @@ public class ReversePreindex {
|
||||
*/
|
||||
public static ReversePreindex constructPreindex(IndexJournalReader reader,
|
||||
DocIdRewriter docIdRewriter,
|
||||
Path destDir) throws IOException
|
||||
Path workDir) throws IOException
|
||||
{
|
||||
Path segmentWordsFile = Files.createTempFile(destDir, "segment_words", ".dat");
|
||||
Path segmentCountsFile = Files.createTempFile(destDir, "segment_counts", ".dat");
|
||||
Path docsFile = Files.createTempFile(destDir, "docs", ".dat");
|
||||
Path segmentWordsFile = Files.createTempFile(workDir, "segment_words", ".dat");
|
||||
Path segmentCountsFile = Files.createTempFile(workDir, "segment_counts", ".dat");
|
||||
Path docsFile = Files.createTempFile(workDir, "docs", ".dat");
|
||||
|
||||
var segments = ReversePreindexWordSegments.construct(reader, segmentWordsFile, segmentCountsFile);
|
||||
var docs = ReversePreindexDocuments.construct(docsFile, reader, docIdRewriter, segments);
|
||||
var docs = ReversePreindexDocuments.construct(docsFile, workDir, reader, docIdRewriter, segments);
|
||||
return new ReversePreindex(segments, docs);
|
||||
}
|
||||
|
||||
|
@ -4,6 +4,7 @@ import lombok.SneakyThrows;
|
||||
import nu.marginalia.array.LongArray;
|
||||
import nu.marginalia.array.LongArrayFactory;
|
||||
import nu.marginalia.index.journal.reader.IndexJournalReader;
|
||||
import nu.marginalia.rwf.RandomFileAssembler;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -23,7 +24,7 @@ public class ReversePreindexDocuments {
|
||||
final Path file;
|
||||
public final LongArray documents;
|
||||
private static final int RECORD_SIZE_LONGS = 2;
|
||||
private static final Logger logger= LoggerFactory.getLogger(ReversePreindexDocuments.class);
|
||||
private static final Logger logger = LoggerFactory.getLogger(ReversePreindexDocuments.class);
|
||||
|
||||
public ReversePreindexDocuments(LongArray documents, Path file) {
|
||||
this.documents = documents;
|
||||
@ -32,11 +33,12 @@ public class ReversePreindexDocuments {
|
||||
|
||||
public static ReversePreindexDocuments construct(
|
||||
Path docsFile,
|
||||
Path workDir,
|
||||
IndexJournalReader reader,
|
||||
DocIdRewriter docIdRewriter,
|
||||
ReversePreindexWordSegments segments) throws IOException {
|
||||
|
||||
createUnsortedDocsFile(docsFile, reader, segments, docIdRewriter);
|
||||
createUnsortedDocsFile(docsFile, workDir, reader, segments, docIdRewriter);
|
||||
|
||||
LongArray docsFileMap = LongArrayFactory.mmapForModifyingShared(docsFile);
|
||||
sortDocsFile(docsFileMap, segments);
|
||||
@ -58,12 +60,14 @@ public class ReversePreindexDocuments {
|
||||
}
|
||||
|
||||
private static void createUnsortedDocsFile(Path docsFile,
|
||||
Path workDir,
|
||||
IndexJournalReader reader,
|
||||
ReversePreindexWordSegments segments,
|
||||
DocIdRewriter docIdRewriter) throws IOException {
|
||||
long fileSize = RECORD_SIZE_LONGS * segments.totalSize();
|
||||
|
||||
try (LongArray outArray = LongArrayFactory.onHeapConfined(fileSize)) {
|
||||
long fileSizeLongs = RECORD_SIZE_LONGS * segments.totalSize();
|
||||
|
||||
try (RandomFileAssembler assembly = RandomFileAssembler.create(workDir, fileSizeLongs)) {
|
||||
|
||||
var offsetMap = segments.asMap(RECORD_SIZE_LONGS);
|
||||
offsetMap.defaultReturnValue(0);
|
||||
@ -77,12 +81,12 @@ public class ReversePreindexDocuments {
|
||||
|
||||
long offset = offsetMap.addTo(wordId, RECORD_SIZE_LONGS);
|
||||
|
||||
outArray.set(offset + 0, rankEncodedId);
|
||||
outArray.set(offset + 1, wordMeta);
|
||||
assembly.put(offset + 0, rankEncodedId);
|
||||
assembly.put(offset + 1, wordMeta);
|
||||
}
|
||||
}
|
||||
|
||||
outArray.write(docsFile);
|
||||
assembly.write(docsFile);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,6 +1,5 @@
|
||||
package nu.marginalia.index.construction;
|
||||
|
||||
import nu.marginalia.array.algo.SortingContext;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.BeforeEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
@ -54,7 +53,7 @@ class ReversePreindexDocsTest {
|
||||
);
|
||||
|
||||
var segments = ReversePreindexWordSegments.construct(reader, wordsIdFile, countsFile);
|
||||
var docs = ReversePreindexDocuments.construct(docsFile, reader, DocIdRewriter.identity(), segments);
|
||||
var docs = ReversePreindexDocuments.construct(docsFile, tempDir, reader, DocIdRewriter.identity(), segments);
|
||||
|
||||
List<TestSegmentData> expected = List.of(
|
||||
new TestSegmentData(-100, 0, 2, new long[] { -0xF00BA3L, 0 }),
|
||||
@ -83,7 +82,7 @@ class ReversePreindexDocsTest {
|
||||
);
|
||||
|
||||
var segments = ReversePreindexWordSegments.construct(reader, wordsIdFile, countsFile);
|
||||
var docs = ReversePreindexDocuments.construct(docsFile, reader, DocIdRewriter.identity(), segments);
|
||||
var docs = ReversePreindexDocuments.construct(docsFile, tempDir, reader, DocIdRewriter.identity(), segments);
|
||||
|
||||
List<TestSegmentData> expected = List.of(
|
||||
new TestSegmentData(4, 0, 4, new long[] { -0xF00BA3L, 0, -0xF00BA3L, 0 })
|
||||
@ -109,7 +108,7 @@ class ReversePreindexDocsTest {
|
||||
);
|
||||
|
||||
var segments = ReversePreindexWordSegments.construct(reader, wordsIdFile, countsFile);
|
||||
var docs = ReversePreindexDocuments.construct(docsFile, reader, DocIdRewriter.identity(), segments);
|
||||
var docs = ReversePreindexDocuments.construct(docsFile, tempDir, reader, DocIdRewriter.identity(), segments);
|
||||
|
||||
List<TestSegmentData> expected = List.of(
|
||||
new TestSegmentData(-100, 0, 4, new long[] { -0xF00BA3L, 0, 0xF00BA4L, 0 }),
|
||||
|
@ -9,6 +9,8 @@ java {
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation project(':code:libraries:array')
|
||||
|
||||
implementation libs.bundles.slf4j
|
||||
|
||||
testImplementation libs.bundles.slf4j.test
|
||||
|
@ -0,0 +1,125 @@
|
||||
package nu.marginalia.rwf;
|
||||
|
||||
import nu.marginalia.array.LongArray;
|
||||
import nu.marginalia.array.LongArrayFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.nio.file.StandardCopyOption;
|
||||
import java.nio.file.StandardOpenOption;
|
||||
|
||||
/** A RandomFileAssembler is a way to write a large file out of order
|
||||
* in a way that is efficient for SSDs.
|
||||
*/
|
||||
public interface RandomFileAssembler extends AutoCloseable {
|
||||
|
||||
void put(long address, long data) throws IOException;
|
||||
void write(Path file) throws IOException;
|
||||
void close() throws IOException;
|
||||
|
||||
|
||||
/** Select the appropriate RandomFileAssembler implementation based on
|
||||
* the system configuration.
|
||||
*/
|
||||
static RandomFileAssembler create(Path workDir,
|
||||
long totalSize) throws IOException {
|
||||
// If the system is configured to conserve memory, we use temp files
|
||||
if (Boolean.getBoolean("system.conserve-memory")) {
|
||||
return ofTempFiles(workDir);
|
||||
}
|
||||
|
||||
// If the file is small, we use straight mmap
|
||||
if (totalSize < 128_000_000) { // 128M longs = 1 GB
|
||||
return ofMmap(workDir, totalSize);
|
||||
}
|
||||
|
||||
// If the file is large, we use an in-memory buffer to avoid disk thrashing
|
||||
return ofInMemoryAsssembly(totalSize);
|
||||
|
||||
}
|
||||
|
||||
|
||||
/** Create a RandomFileAssembler that writes to a series of small files.
|
||||
* This has negligible memory overhead, but is slower than in-memory
|
||||
* or mmap for small files.
|
||||
*/
|
||||
static RandomFileAssembler ofTempFiles(Path workDir) throws IOException {
|
||||
|
||||
return new RandomFileAssembler() {
|
||||
private final RandomWriteFunnel funnel = new RandomWriteFunnel(workDir, 10_000_000);
|
||||
@Override
|
||||
public void put(long address, long data) throws IOException {
|
||||
funnel.put(address, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(Path file) throws IOException {
|
||||
try (var channel = Files.newByteChannel(file, StandardOpenOption.WRITE, StandardOpenOption.CREATE)) {
|
||||
funnel.write(channel);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
funnel.close();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/** Create a RandomFileAssembler that writes to a LongArray in memory. */
|
||||
static RandomFileAssembler ofInMemoryAsssembly(long size) {
|
||||
return new RandomFileAssembler() {
|
||||
private final LongArray buffer = LongArrayFactory.onHeapConfined(size);
|
||||
|
||||
@Override
|
||||
public void put(long address, long data) {
|
||||
buffer.set(address, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(Path file) throws IOException {
|
||||
buffer.write(file);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
buffer.close();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
/** Create a RandomFileAssembler that writes to a file using mmap.
|
||||
* This is the fastest method for small files, but has a large memory
|
||||
* overhead and is slow for large files, where the OS will start pushing
|
||||
* changes to disk continuously.
|
||||
* */
|
||||
static RandomFileAssembler ofMmap(Path destDir, long size) throws IOException {
|
||||
return new RandomFileAssembler() {
|
||||
private final Path workFile = Files.createTempFile(destDir, "mmap", ".dat");
|
||||
private final LongArray buffer = LongArrayFactory.mmapForWritingConfined(workFile, size);
|
||||
|
||||
@Override
|
||||
public void put(long address, long data) {
|
||||
buffer.set(address, data);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(Path dest) throws IOException {
|
||||
buffer.force();
|
||||
|
||||
Files.move(workFile, dest,
|
||||
StandardCopyOption.REPLACE_EXISTING,
|
||||
StandardCopyOption.ATOMIC_MOVE);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
buffer.close();
|
||||
|
||||
// Catch the case where e.g. write() fails with an exception and workFile doesn't get moved
|
||||
Files.deleteIfExists(workFile);
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
@ -26,7 +26,7 @@ public class RandomWriteFunnel implements AutoCloseable {
|
||||
private final Path tempDir;
|
||||
private final int binSize;
|
||||
|
||||
public RandomWriteFunnel(Path tempDir, int binSize) throws IOException {
|
||||
RandomWriteFunnel(Path tempDir, int binSize) throws IOException {
|
||||
this.binSize = binSize;
|
||||
this.tempDir = tempDir;
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user