From f76af4ca79e8d8b84deb2883824d69a568ba62b6 Mon Sep 17 00:00:00 2001 From: vlofgren Date: Sat, 18 Jun 2022 15:54:58 +0200 Subject: [PATCH] Refactoring conversion --- .../java/nu/marginalia/util/ListChunker.java | 31 ++ .../nu/marginalia/util/RandomWriteFunnel.java | 50 +-- .../util/multimap/MultimapFileLong.java | 53 ++- .../multimap/MultimapFileLongOffsetSlice.java | 3 + .../util/multimap/MultimapFileLongSlice.java | 2 + .../wmsa/client/AbstractClient.java | 8 +- .../loader/SqlLoadProcessedDocument.java | 2 - .../CrawlJobExtractorPageRankMain.java | 6 +- .../edge/data/dao/EdgeDataStoreDaoImpl.java | 15 +- .../data/dao/task/EdgeDomainBlacklist.java | 2 +- .../wmsa/edge/index/EdgeIndexBucket.java | 6 +- .../wmsa/edge/index/EdgeIndexControl.java | 6 +- .../wmsa/edge/index/EdgeIndexService.java | 59 ++- .../wmsa/edge/index/IndexServicesFactory.java | 27 +- .../conversion/SearchIndexConverter.java | 367 ++++++------------ .../conversion/SearchIndexPartitioner.java | 2 +- .../conversion/SearchIndexPreconverter.java | 65 +--- .../words/WordIndexOffsetsTable.java | 6 +- .../journal/SearchIndexJournalEntry.java | 49 +++ .../SearchIndexJournalEntryHeader.java | 16 + .../journal/SearchIndexJournalFileHeader.java | 4 + .../journal/SearchIndexJournalReader.java | 123 ++++++ .../journal/SearchIndexJournalWriter.java | 10 + ...java => SearchIndexJournalWriterImpl.java} | 36 +- .../edge/index/journal/SearchIndexWriter.java | 16 - .../wmsa/edge/index/reader/SearchIndexes.java | 8 +- .../nu/marginalia/wmsa/edge/model/EdgeId.java | 11 +- .../model/search/EdgeSearchResultItem.java | 2 +- .../wmsa/edge/search/EdgeSearchOperator.java | 2 +- .../command/commands/SiteSearchCommand.java | 4 +- .../search/results/SearchResultDecorator.java | 4 +- .../siteinfo/DomainInformationService.java | 21 +- .../index/service/EdgeIndexClientTest.java | 5 +- .../service/SearchIndexJournalWriterTest.java | 76 ++++ .../index/service/SearchIndexWriterTest.java | 90 ----- .../service/util/RandomWriteFunnelTest.java | 33 ++ .../com/upserve/uppend/blobs/NativeIO.java | 3 - 37 files changed, 658 insertions(+), 565 deletions(-) create mode 100644 marginalia_nu/src/main/java/nu/marginalia/util/ListChunker.java create mode 100644 marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalEntry.java create mode 100644 marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalEntryHeader.java create mode 100644 marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalFileHeader.java create mode 100644 marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalReader.java create mode 100644 marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalWriter.java rename marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/{SearchIndexWriterImpl.java => SearchIndexJournalWriterImpl.java} (68%) delete mode 100644 marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexWriter.java create mode 100644 marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/SearchIndexJournalWriterTest.java delete mode 100644 marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/SearchIndexWriterTest.java diff --git a/marginalia_nu/src/main/java/nu/marginalia/util/ListChunker.java b/marginalia_nu/src/main/java/nu/marginalia/util/ListChunker.java new file mode 100644 index 00000000..ef27ba1d --- /dev/null +++ b/marginalia_nu/src/main/java/nu/marginalia/util/ListChunker.java @@ -0,0 +1,31 @@ +package nu.marginalia.util; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +public class ListChunker { + + /** Chops data into a list of lists of max length size + * + * Caveat: Relies on subList and does not clone "data", so + * changes to the original list may affect the sub-lists + * in unspecified ways + * + * @see List#subList + */ + public static List> chopList(List data, int size) { + if (data.isEmpty()) + return Collections.emptyList(); + else if (data.size() < size) + return List.of(data); + + final List> ret = new ArrayList<>(1 + data.size() / size); + + for (int i = 0; i < data.size(); i+=size) { + ret.add(data.subList(i, Math.min(data.size(), i+size))); + } + + return ret; + } +} diff --git a/marginalia_nu/src/main/java/nu/marginalia/util/RandomWriteFunnel.java b/marginalia_nu/src/main/java/nu/marginalia/util/RandomWriteFunnel.java index 55c83464..0c274c2b 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/util/RandomWriteFunnel.java +++ b/marginalia_nu/src/main/java/nu/marginalia/util/RandomWriteFunnel.java @@ -1,6 +1,6 @@ package nu.marginalia.util; -import io.prometheus.client.Gauge; +import lombok.SneakyThrows; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,10 +18,6 @@ import java.nio.file.Path; * */ public class RandomWriteFunnel implements AutoCloseable { - private final static Gauge write_rate = Gauge.build("wmsa_rwf_write_bytes", "Bytes/s") - .register(); - private final static Gauge transfer_rate = Gauge.build("wmsa_rwf_transfer_bytes", "Bytes/s") - .register(); private static final Logger logger = LoggerFactory.getLogger(RandomWriteFunnel.class); private final DataBin[] bins; @@ -34,7 +30,7 @@ public class RandomWriteFunnel implements AutoCloseable { int binCount = (int) (size / binSize + ((size % binSize) != 0L ? 1 : 0)); bins = new DataBin[binCount]; for (int i = 0; i < binCount; i++) { - bins[i] = new DataBin(tempDir, (int) Math.min(size - binSize * i, binSize)); + bins[i] = new DataBin(tempDir, Math.min((int) (size - binSize * i), binSize)); } } else { @@ -42,25 +38,25 @@ public class RandomWriteFunnel implements AutoCloseable { } } - public void put(long address, long data) throws IOException { - bins[((int)(address / binSize))].put((int)(address%binSize), data); + @SneakyThrows + public void put(long address, long data) { + int bin = (int)(address / binSize); + int offset = (int)(address%binSize); + + bins[bin].put(offset, data); } public void write(FileChannel o) throws IOException { ByteBuffer buffer = ByteBuffer.allocateDirect(binSize*8); - logger.debug("Writing from RWF"); - for (int i = 0; i < bins.length; i++) { - var bin = bins[i]; + for (var bin : bins) { buffer.clear(); bin.eval(buffer); while (buffer.hasRemaining()) { - int wb = o.write(buffer); - write_rate.set(wb); + o.write(buffer); } } - logger.debug("Done"); } @Override @@ -84,12 +80,12 @@ public class RandomWriteFunnel implements AutoCloseable { } void put(int address, long data) throws IOException { - buffer.putInt(address); - buffer.putLong(data); - - if (buffer.capacity() - buffer.position() < 12) { + if (buffer.remaining() < 12) { flushBuffer(); } + + buffer.putInt(address); + buffer.putLong(data); } private void flushBuffer() throws IOException { @@ -97,12 +93,15 @@ public class RandomWriteFunnel implements AutoCloseable { return; buffer.flip(); - while (channel.write(buffer) > 0); + while (buffer.hasRemaining()) + channel.write(buffer); + buffer.clear(); } private void eval(ByteBuffer dest) throws IOException { flushBuffer(); + channel.force(false); channel.position(0); buffer.clear(); @@ -117,14 +116,17 @@ public class RandomWriteFunnel implements AutoCloseable { if (rb < 0) { break; } - else { - transfer_rate.set(rb); - } buffer.flip(); while (buffer.limit() - buffer.position() >= 12) { - int addr = buffer.getInt(); + int addr = 8 * buffer.getInt(); long data = buffer.getLong(); - dest.putLong(8*addr, data); + + try { + dest.putLong(addr, data); + } + catch (IndexOutOfBoundsException ex) { + logger.info("!!!bad[{}]={}", addr, data); + } } buffer.compact(); } diff --git a/marginalia_nu/src/main/java/nu/marginalia/util/multimap/MultimapFileLong.java b/marginalia_nu/src/main/java/nu/marginalia/util/multimap/MultimapFileLong.java index f381a977..e9a9b4fe 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/util/multimap/MultimapFileLong.java +++ b/marginalia_nu/src/main/java/nu/marginalia/util/multimap/MultimapFileLong.java @@ -36,9 +36,7 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice { private long mappedSize; final static long WORD_SIZE = 8; - private boolean loadAggressively; - - private final NativeIO.Advice advice = null; + private NativeIO.Advice defaultAdvice = null; public static MultimapFileLong forReading(Path file) throws IOException { long fileSize = Files.size(file); @@ -70,12 +68,7 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice { long mapSize, int bufferSize) throws IOException { - this(new RandomAccessFile(file, translateToRAFMode(mode)), mode, mapSize, bufferSize, false); - } - - public MultimapFileLong loadAggressively(boolean v) { - this.loadAggressively = v; - return this; + this(new RandomAccessFile(file, translateToRAFMode(mode)), mode, mapSize, bufferSize); } private static String translateToRAFMode(FileChannel.MapMode mode) { @@ -91,13 +84,11 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice { public MultimapFileLong(RandomAccessFile file, FileChannel.MapMode mode, long mapSizeBytes, - int bufferSizeWords, - boolean loadAggressively) throws IOException { + int bufferSizeWords) throws IOException { this.mode = mode; this.bufferSize = bufferSizeWords; this.mapSize = mapSizeBytes; this.fileLength = file.length(); - this.loadAggressively = loadAggressively; channel = file.getChannel(); mappedSize = 0; @@ -115,6 +106,7 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice { @SneakyThrows public void advice(NativeIO.Advice advice) { + this.defaultAdvice = advice; for (var buffer : mappedByteBuffers) { NativeIO.madvise(buffer, advice); } @@ -157,7 +149,7 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice { } @SneakyThrows - private void grow(long posIdxRequired) { + public void grow(long posIdxRequired) { if (posIdxRequired*WORD_SIZE > mapSize && mode == READ_ONLY) { throw new IndexOutOfBoundsException(posIdxRequired + " (max " + mapSize + ")"); } @@ -182,11 +174,8 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice { var buffer = channel.map(mode, posBytes, bzBytes); - if (loadAggressively) - buffer.load(); - - if (advice != null) { - NativeIO.madvise(buffer, advice); + if (defaultAdvice != null) { + NativeIO.madvise(buffer, defaultAdvice); } buffers.add(buffer.asLongBuffer()); @@ -262,6 +251,32 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice { } + @Override + public void read(LongBuffer vals, long idx) { + int n = vals.limit() - vals.position(); + if (idx+n >= mappedSize) { + grow(idx+n); + } + int iN = (int)((idx + n) / bufferSize); + + for (int i = 0; i < n; ) { + int i0 = (int)((idx + i) / bufferSize); + + int bufferOffset = (int) ((idx+i) % bufferSize); + var buffer = buffers.get(i0); + + final int l; + + if (i0 < iN) l = bufferSize - bufferOffset; + else l = Math.min(n - i, bufferSize - bufferOffset); + + vals.put(vals.position() + i, buffer, bufferOffset, l); + i+=l; + } + + } + + @Override public void write(long[] vals, long idx) { write(vals, vals.length, idx); @@ -363,8 +378,10 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice { @Override public void close() throws IOException { force(); + mappedByteBuffers.clear(); buffers.clear(); + channel.close(); // I want to believe diff --git a/marginalia_nu/src/main/java/nu/marginalia/util/multimap/MultimapFileLongOffsetSlice.java b/marginalia_nu/src/main/java/nu/marginalia/util/multimap/MultimapFileLongOffsetSlice.java index c2630ddc..bd35bd9b 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/util/multimap/MultimapFileLongOffsetSlice.java +++ b/marginalia_nu/src/main/java/nu/marginalia/util/multimap/MultimapFileLongOffsetSlice.java @@ -38,6 +38,9 @@ public class MultimapFileLongOffsetSlice implements MultimapFileLongSlice { map.read(vals, n, idx+off); } + @Override + public void read(LongBuffer vals, long idx) { map.read(vals, idx+off); } + @Override public void write(long[] vals, long idx) { map.write(vals, idx+off); diff --git a/marginalia_nu/src/main/java/nu/marginalia/util/multimap/MultimapFileLongSlice.java b/marginalia_nu/src/main/java/nu/marginalia/util/multimap/MultimapFileLongSlice.java index abf29f51..27d6ae06 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/util/multimap/MultimapFileLongSlice.java +++ b/marginalia_nu/src/main/java/nu/marginalia/util/multimap/MultimapFileLongSlice.java @@ -15,6 +15,8 @@ public interface MultimapFileLongSlice { void read(long[] vals, int n, long idx); + void read(LongBuffer vals, long idx); + void write(long[] vals, long idx); void write(long[] vals, int n, long idx); diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/client/AbstractClient.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/client/AbstractClient.java index 5091b75e..603f57e5 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/client/AbstractClient.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/client/AbstractClient.java @@ -6,6 +6,7 @@ import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.ObservableSource; import io.reactivex.rxjava3.plugins.RxJavaPlugins; import lombok.SneakyThrows; +import marcono1234.gson.recordadapter.RecordTypeAdapterFactory; import nu.marginalia.wmsa.client.exception.LocalException; import nu.marginalia.wmsa.client.exception.NetworkException; import nu.marginalia.wmsa.client.exception.RemoteException; @@ -30,9 +31,12 @@ import java.util.zip.GZIPOutputStream; public abstract class AbstractClient implements AutoCloseable { public static final String CONTEXT_OUTBOUND_REQUEST = "outbound-request"; - private final Gson gson = new GsonBuilder().create(); + + private final Gson gson = new GsonBuilder() + .registerTypeAdapterFactory(RecordTypeAdapterFactory.builder().allowMissingComponentValues().create()) + .create(); + private final Logger logger = LoggerFactory.getLogger(getClass()); - private final Marker httpMarker = MarkerFactory.getMarker("HTTP"); private final OkHttpClient client; diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/converting/loader/SqlLoadProcessedDocument.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/converting/loader/SqlLoadProcessedDocument.java index e2e25fff..fb8a6303 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/converting/loader/SqlLoadProcessedDocument.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/converting/loader/SqlLoadProcessedDocument.java @@ -94,8 +94,6 @@ public class SqlLoadProcessedDocument { } catch (SQLException ex) { logger.warn("SQL error inserting document", ex); } - - } public void loadWithError(LoaderData data, List documents) { diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/crawling/CrawlJobExtractorPageRankMain.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/crawling/CrawlJobExtractorPageRankMain.java index ea1946fc..ef3bf39f 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/crawling/CrawlJobExtractorPageRankMain.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/crawling/CrawlJobExtractorPageRankMain.java @@ -105,7 +105,7 @@ public class CrawlJobExtractorPageRankMain { try (var domainQuery = conn.prepareStatement(specificDomainSqlFromId); var urlQuery = conn.prepareStatement(urlsSql)) { - domainQuery.setInt(1, domainId.getId()); + domainQuery.setInt(1, domainId.id()); ResultSet rsp = domainQuery.executeQuery(); domainName = rsp.next() ? rsp.getString(1) : ""; @@ -113,10 +113,10 @@ public class CrawlJobExtractorPageRankMain { spec.id = createId(new EdgeDomain(domainName)); spec.urls = new ArrayList<>(1000); - spec.crawlDepth = getCrawlDepth(new DomainWithId(domainName, domainId.getId())); + spec.crawlDepth = getCrawlDepth(new DomainWithId(domainName, domainId.id())); urlQuery.setString(1, domainName.toString()); - urlQuery.setInt(2, domainId.getId()); + urlQuery.setInt(2, domainId.id()); urlQuery.setFetchSize(1000); rsp = urlQuery.executeQuery(); diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/data/dao/EdgeDataStoreDaoImpl.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/data/dao/EdgeDataStoreDaoImpl.java index 30ea2256..c73089b0 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/data/dao/EdgeDataStoreDaoImpl.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/data/dao/EdgeDataStoreDaoImpl.java @@ -17,13 +17,8 @@ import nu.marginalia.wmsa.edge.search.model.BrowseResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; import java.sql.SQLException; -import java.sql.Types; import java.util.*; -import java.util.function.Function; -import java.util.regex.Pattern; -import java.util.stream.Collectors; public class EdgeDataStoreDaoImpl implements EdgeDataStoreDao { @@ -71,7 +66,7 @@ public class EdgeDataStoreDaoImpl implements EdgeDataStoreDao { private String idList(List> ids) { StringJoiner j = new StringJoiner(",", "(", ")"); for (var id : ids) { - j.add(Integer.toString(id.getId())); + j.add(Integer.toString(id.id())); } return j.toString(); } @@ -154,7 +149,7 @@ public class EdgeDataStoreDaoImpl implements EdgeDataStoreDao { try (var connection = dataSource.getConnection()) { try (var stmt = connection.prepareStatement(q)) { stmt.setFetchSize(count); - stmt.setInt(1, domainId.getId()); + stmt.setInt(1, domainId.id()); stmt.setInt(2, count); var rsp = stmt.executeQuery(); while (rsp.next()) { @@ -183,7 +178,7 @@ public class EdgeDataStoreDaoImpl implements EdgeDataStoreDao { try (var stmt = connection.prepareStatement(q2)) { stmt.setFetchSize(count/2); - stmt.setInt(1, domainId.getId()); + stmt.setInt(1, domainId.id()); stmt.setInt(2, count/2 - domains.size()); var rsp = stmt.executeQuery(); while (rsp.next() && domains.size() < count/2) { @@ -214,7 +209,7 @@ public class EdgeDataStoreDaoImpl implements EdgeDataStoreDao { LIMIT ?"""; try (var stmt = connection.prepareStatement(q3)) { stmt.setFetchSize(count/2); - stmt.setInt(1, domainId.getId()); + stmt.setInt(1, domainId.id()); stmt.setInt(2, count/2 - domains.size()); var rsp = stmt.executeQuery(); @@ -275,7 +270,7 @@ public class EdgeDataStoreDaoImpl implements EdgeDataStoreDao { try (var connection = dataSource.getConnection()) { try (var stmt = connection.prepareStatement("SELECT DOMAIN_NAME FROM EC_DOMAIN WHERE ID=?")) { - stmt.setInt(1, id.getId()); + stmt.setInt(1, id.id()); var rsp = stmt.executeQuery(); if (rsp.next()) { return new EdgeDomain(rsp.getString(1)); diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/data/dao/task/EdgeDomainBlacklist.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/data/dao/task/EdgeDomainBlacklist.java index fa1899b1..df265a5f 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/data/dao/task/EdgeDomainBlacklist.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/data/dao/task/EdgeDomainBlacklist.java @@ -9,7 +9,7 @@ import nu.marginalia.wmsa.edge.model.EdgeId; public interface EdgeDomainBlacklist { boolean isBlacklisted(int domainId); default boolean isBlacklisted(EdgeId domainId) { - return isBlacklisted(domainId.getId()); + return isBlacklisted(domainId.id()); } default TIntHashSet getSpamDomains() { return new TIntHashSet(); diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/EdgeIndexBucket.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/EdgeIndexBucket.java index 05bcfe75..09890252 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/EdgeIndexBucket.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/EdgeIndexBucket.java @@ -1,11 +1,9 @@ package nu.marginalia.wmsa.edge.index; -import nu.marginalia.wmsa.edge.index.EdgeIndexControl; -import nu.marginalia.wmsa.edge.index.IndexServicesFactory; import nu.marginalia.wmsa.edge.index.model.EdgeIndexSearchTerms; import nu.marginalia.wmsa.edge.index.model.IndexBlock; import nu.marginalia.wmsa.edge.index.reader.SearchIndexReader; -import nu.marginalia.wmsa.edge.index.journal.SearchIndexWriter; +import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalWriter; import nu.marginalia.wmsa.edge.index.reader.query.IndexSearchBudget; import nu.marginalia.wmsa.edge.index.reader.query.Query; import org.jetbrains.annotations.NotNull; @@ -31,7 +29,7 @@ public class EdgeIndexBucket { @NotNull private final IndexServicesFactory servicesFactory; private final EdgeIndexControl indexControl; - private final SearchIndexWriter writer; + private final SearchIndexJournalWriter writer; private final int id; diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/EdgeIndexControl.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/EdgeIndexControl.java index ab7c73fe..8df32c0a 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/EdgeIndexControl.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/EdgeIndexControl.java @@ -23,7 +23,7 @@ public class EdgeIndexControl { for (IndexBlock block : IndexBlock.values()) { try { - servicesFactory.getIndexConverter(id, block); + servicesFactory.convertIndex(id, block); System.runFinalization(); System.gc(); @@ -40,10 +40,6 @@ public class EdgeIndexControl { System.gc(); } - public long wordCount(int id) { - return servicesFactory.wordCount(id); - } - public void switchIndexFiles(int id) throws Exception { servicesFactory.switchFilesJob(id).call(); } diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/EdgeIndexService.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/EdgeIndexService.java index de6276a8..829a59af 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/EdgeIndexService.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/EdgeIndexService.java @@ -11,12 +11,17 @@ import gnu.trove.set.hash.TIntHashSet; import io.prometheus.client.Counter; import io.prometheus.client.Histogram; import io.reactivex.rxjava3.schedulers.Schedulers; +import marcono1234.gson.recordadapter.RecordTypeAdapterFactory; +import nu.marginalia.util.ListChunker; import nu.marginalia.wmsa.configuration.server.Initialization; import nu.marginalia.wmsa.configuration.server.MetricsServer; import nu.marginalia.wmsa.configuration.server.Service; +import nu.marginalia.wmsa.edge.index.dictionary.DictionaryWriter; +import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalEntry; +import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalEntryHeader; import nu.marginalia.wmsa.edge.index.model.*; import nu.marginalia.wmsa.edge.index.reader.SearchIndexes; -import nu.marginalia.wmsa.edge.index.journal.SearchIndexWriterImpl; +import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalWriterImpl; import nu.marginalia.wmsa.edge.index.reader.query.IndexSearchBudget; import nu.marginalia.util.dict.DictionaryHashMap; import nu.marginalia.wmsa.edge.model.*; @@ -48,8 +53,11 @@ public class EdgeIndexService extends Service { @NotNull private final Initialization init; private final SearchIndexes indexes; + private final DictionaryWriter dictionaryWriter; - private final Gson gson = new GsonBuilder().create(); + private final Gson gson = new GsonBuilder() + .registerTypeAdapterFactory(RecordTypeAdapterFactory.builder().allowMissingComponentValues().create()) + .create(); private static final Histogram wmsa_edge_index_query_time = Histogram.build().name("wmsa_edge_index_query_time").help("-").register(); @@ -66,12 +74,13 @@ public class EdgeIndexService extends Service { @Named("service-port") Integer port, Initialization init, MetricsServer metricsServer, - SearchIndexes indexes - ) { + SearchIndexes indexes, + IndexServicesFactory servicesFactory) { super(ip, port, init, metricsServer); this.init = init; this.indexes = indexes; + this.dictionaryWriter = servicesFactory.getDictionaryWriter(); Spark.post("/words/", this::putWords); Spark.post("/search/", this::search, gson::toJson); @@ -173,29 +182,19 @@ public class EdgeIndexService extends Service { public void putWords(EdgeId domainId, EdgeId urlId, EdgePageWords words, int idx ) { - SearchIndexWriterImpl indexWriter = indexes.getIndexWriter(idx); + SearchIndexJournalWriterImpl indexWriter = indexes.getIndexWriter(idx); - if (!words.words.isEmpty()) { - if (words.size() < 1000) { - indexWriter.put(domainId, urlId, words.block, words.words); - } else { - chunks(words.words, 1000).forEach(chunk -> { - indexWriter.put(domainId, urlId, words.block, chunk); - }); - } - } + for (var chunk : ListChunker.chopList(words.words, SearchIndexJournalEntry.MAX_LENGTH)) { + + var entry = new SearchIndexJournalEntry(getWordIds(chunk)); + var header = new SearchIndexJournalEntryHeader(domainId, urlId, words.block); + + indexWriter.put(header, entry); + }; } - - private List> chunks(Collection coll, int size) { - List> ret = new ArrayList<>(); - List data = List.copyOf(coll); - - for (int i = 0; i < data.size(); i+=size) { - ret.add(data.subList(i, Math.min(data.size(), i+size))); - } - - return ret; + private long[] getWordIds(List words) { + return words.stream().filter(w -> w.length() < Byte.MAX_VALUE).mapToLong(dictionaryWriter::get).toArray(); } private Object search(Request request, Response response) { @@ -341,7 +340,7 @@ public class EdgeIndexService extends Service { getQuery(i, budget, sq.block, lv -> localFilter.filterRawValue(i, lv), searchTerms) .mapToObj(id -> new EdgeSearchResultItem(i, sq.termSize(), id)) - .filter(ri -> !seenResults.contains(ri.url.getId()) && localFilter.test(i, domainCountFilter, ri)) + .filter(ri -> !seenResults.contains(ri.url.id()) && localFilter.test(i, domainCountFilter, ri)) .limit(specs.limitTotal * 3L) .distinct() .limit(Math.min(specs.limitByBucket @@ -350,7 +349,7 @@ public class EdgeIndexService extends Service { for (var result : resultsForBucket) { - seenResults.add(result.url.getId()); + seenResults.add(result.url.id()); } for (var result : resultsForBucket) { for (var searchTerm : sq.searchTermsInclude) { @@ -401,7 +400,7 @@ public class EdgeIndexService extends Service { public boolean filterRawValue(int bucket, long value) { var domain = new EdgeId((int)(value >>> 32)); - if (domain.getId() == Integer.MAX_VALUE) { + if (domain.id() == Integer.MAX_VALUE) { return true; } @@ -409,11 +408,11 @@ public class EdgeIndexService extends Service { } long getKey(int bucket, EdgeId id) { - return ((long)bucket) << 32 | id.getId(); + return ((long)bucket) << 32 | id.id(); } public boolean test(int bucket, EdgeSearchResultItem item) { - if (item.domain.getId() == Integer.MAX_VALUE) { + if (item.domain.id() == Integer.MAX_VALUE) { return true; } @@ -431,7 +430,7 @@ public class EdgeIndexService extends Service { } public boolean test(int bucket, DomainResultCountFilter root, EdgeSearchResultItem item) { - if (item.domain.getId() == Integer.MAX_VALUE) { + if (item.domain.id() == Integer.MAX_VALUE) { return true; } return root.getCount(bucket, item) + resultsByDomain.adjustOrPutValue(getKey(bucket, item.domain), 1, 1) <= limitByDomain; diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/IndexServicesFactory.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/IndexServicesFactory.java index 61e64b41..40c733e2 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/IndexServicesFactory.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/IndexServicesFactory.java @@ -8,7 +8,7 @@ import nu.marginalia.wmsa.edge.data.dao.task.EdgeDomainBlacklist; import nu.marginalia.wmsa.edge.index.conversion.ConversionUnnecessaryException; import nu.marginalia.wmsa.edge.index.conversion.SearchIndexConverter; import nu.marginalia.wmsa.edge.index.conversion.SearchIndexPreconverter; -import nu.marginalia.wmsa.edge.index.journal.SearchIndexWriterImpl; +import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalWriterImpl; import nu.marginalia.wmsa.edge.index.model.IndexBlock; import nu.marginalia.wmsa.edge.index.dictionary.DictionaryReader; import nu.marginalia.wmsa.edge.index.dictionary.DictionaryWriter; @@ -44,7 +44,8 @@ public class IndexServicesFactory { private final DoublePartitionedDataFile indexWriteUrlsFile; private volatile static DictionaryWriter dictionaryWriter; private final Long dictionaryHashMapSize; - private final SearchIndexPartitioner partitoner; + private final SearchIndexPartitioner partitioner; + @Inject public IndexServicesFactory( @Named("tmp-file-dir") Path tmpFileDir, @@ -59,7 +60,7 @@ public class IndexServicesFactory { @Named("edge-index-write-urls-file") String indexWriteUrlsFile, @Named("edge-dictionary-hash-map-size") Long dictionaryHashMapSize, EdgeDomainBlacklist domainBlacklist, - SearchIndexPartitioner partitoner + SearchIndexPartitioner partitioner ) { this.tmpFileDir = tmpFileDir; @@ -73,11 +74,11 @@ public class IndexServicesFactory { this.indexWriteWordsFile = new DoublePartitionedDataFile(partitionRootFast, indexWriteWordsFile); this.indexWriteUrlsFile = new DoublePartitionedDataFile(partitionRootFast, indexWriteUrlsFile); this.preconverterOutputFile = new PartitionedDataFile(partitionRootSlowTmp, "preconverted.dat"); - this.partitoner = partitoner; + this.partitioner = partitioner; } - public SearchIndexWriterImpl getIndexWriter(int idx) { - return new SearchIndexWriterImpl(getDictionaryWriter(), writerIndexFile.get(idx)); + public SearchIndexJournalWriterImpl getIndexWriter(int idx) { + return new SearchIndexJournalWriterImpl(getDictionaryWriter(), writerIndexFile.get(idx)); } public DictionaryWriter getDictionaryWriter() { @@ -93,15 +94,17 @@ public class IndexServicesFactory { } - public SearchIndexConverter getIndexConverter(int id, IndexBlock block) throws ConversionUnnecessaryException, IOException { - return new SearchIndexConverter(block, id, tmpFileDir, + public void convertIndex(int id, IndexBlock block) throws ConversionUnnecessaryException, IOException { + var converter = new SearchIndexConverter(block, id, tmpFileDir, preconverterOutputFile.get(id), indexWriteWordsFile.get(id, block.id), indexWriteUrlsFile.get(id, block.id), - partitoner, + partitioner, domainBlacklist ); + converter.convert(); } + @SneakyThrows public SearchIndexPreconverter getIndexPreconverter() { File[] outputFiles = new File[DYNAMIC_BUCKET_LENGTH+1]; @@ -110,7 +113,7 @@ public class IndexServicesFactory { } return new SearchIndexPreconverter(writerIndexFile.get(0), outputFiles, - partitoner, + partitioner, domainBlacklist ); } @@ -119,10 +122,6 @@ public class IndexServicesFactory { return preconverterOutputFile.get(i); } - public long wordCount(int id) { - return SearchIndexConverter.wordCount(writerIndexFile.get(0)); - } - @SneakyThrows public SearchIndexReader getIndexReader(int id) { EnumMap indexMap = new EnumMap<>(IndexBlock.class); diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexConverter.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexConverter.java index 0827b4e7..2d12d0f4 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexConverter.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexConverter.java @@ -1,331 +1,222 @@ package nu.marginalia.wmsa.edge.index.conversion; -import com.google.inject.Inject; -import com.google.inject.name.Named; -import gnu.trove.set.hash.TIntHashSet; -import lombok.RequiredArgsConstructor; -import lombok.SneakyThrows; import nu.marginalia.wmsa.edge.data.dao.task.EdgeDomainBlacklist; import nu.marginalia.wmsa.edge.index.conversion.words.WordIndexOffsetsTable; -import nu.marginalia.wmsa.edge.index.journal.SearchIndexWriterImpl; +import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalEntry; +import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalReader; import nu.marginalia.wmsa.edge.index.model.IndexBlock; import nu.marginalia.wmsa.edge.index.conversion.words.WordsTableWriter; import nu.marginalia.util.btree.BTreeWriter; import nu.marginalia.util.btree.model.BTreeContext; import nu.marginalia.util.multimap.MultimapFileLong; import nu.marginalia.util.RandomWriteFunnel; -import nu.marginalia.util.multimap.MultimapSorter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.*; -import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.file.Files; import java.nio.file.Path; -import java.util.concurrent.locks.Lock; + +import static nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalEntry.MAX_LENGTH; public class SearchIndexConverter { - private static final long FILE_HEADER_SIZE = 12; - private static final int CHUNK_HEADER_SIZE = 16; - public static final BTreeContext urlsBTreeContext = new BTreeContext(5, 1, ~0, 8); - private final long fileLength; - private final long urlsFileSize; + private final long[] tmpWordsBuffer = new long[MAX_LENGTH]; + private final Path tmpFileDir; - private final FileChannel urlsTmpFileChannel; - private final int wordCount; - private final MultimapFileLong urlsTmpFileMap; private final Logger logger = LoggerFactory.getLogger(getClass()); private final IndexBlock block; private final int bucketId; + private final File inputFile; + private final File outputFileWords; + private final File outputFileUrls; - private final File urlsFile; private final SearchIndexPartitioner partitioner; - private final TIntHashSet spamDomains; - private final MultimapSorter urlTmpFileSorter; + private final EdgeDomainBlacklist blacklist; private final static int internalSortLimit = Boolean.getBoolean("small-ram") ? 1024*1024 : 1024*1024*256; - @SneakyThrows - public static long wordCount(File inputFile) { - try (RandomAccessFile raf = new RandomAccessFile(inputFile, "r")) { - raf.readLong(); - return raf.readInt(); - } - } - - @Inject public SearchIndexConverter(IndexBlock block, - int bucketId, @Named("tmp-file-dir") Path tmpFileDir, - @Named("edge-writer-page-index-file") File inputFile, - @Named("edge-index-write-words-file") File outputFileWords, - @Named("edge-index-write-urls-file") File outputFileUrls, + int bucketId, + Path tmpFileDir, + File inputFile, + File outputFileWords, + File outputFileUrls, SearchIndexPartitioner partitioner, EdgeDomainBlacklist blacklist) - throws ConversionUnnecessaryException, IOException { this.block = block; this.bucketId = bucketId; this.tmpFileDir = tmpFileDir; - this.urlsFile = outputFileUrls; + this.inputFile = inputFile; + this.outputFileWords = outputFileWords; + this.outputFileUrls = outputFileUrls; this.partitioner = partitioner; - this.spamDomains = blacklist.getSpamDomains(); - - logger.info("Converting {} ({}) {}", block.id, block, inputFile); + this.blacklist = blacklist; + } + public void convert() throws IOException { Files.deleteIfExists(outputFileWords.toPath()); Files.deleteIfExists(outputFileUrls.toPath()); - final RandomAccessFile raf = new RandomAccessFile(inputFile, "r"); + SearchIndexJournalReader journalReader = new SearchIndexJournalReader(MultimapFileLong.forReading(inputFile.toPath())); - this.fileLength = raf.readLong(); - this.wordCount = raf.readInt(); - - if (fileLength <= FILE_HEADER_SIZE) { - throw new ConversionUnnecessaryException(); + if (journalReader.fileHeader.fileSize() <= SearchIndexJournalReader.FILE_HEADER_SIZE_BYTES) { + return; } - var inputChannel = raf.getChannel(); + logger.info("Converting {} ({}) {} {}", block.id, block, inputFile, journalReader.fileHeader); - ByteBuffer buffer = ByteBuffer.allocateDirect(10_000); + var lock = partitioner.getReadLock(); + try { + lock.lock(); - urlsFileSize = getUrlsSize(buffer, inputChannel); + var tmpUrlsFile = Files.createTempFile(tmpFileDir, "urls-sorted", ".dat"); - var tmpUrlsFile = Files.createTempFile(tmpFileDir, "urls-sorted", ".dat"); - var urlsTmpFileRaf = new RandomAccessFile(tmpUrlsFile.toFile(), "rw"); - urlsTmpFileChannel = urlsTmpFileRaf.getChannel(); - urlsTmpFileMap = new MultimapFileLong(urlsTmpFileRaf, FileChannel.MapMode.READ_WRITE, urlsFileSize, 8*1024*1024, false); - urlTmpFileSorter = urlsTmpFileMap.createSorter(tmpFileDir, internalSortLimit); + logger.info("Creating word index table {} for block {} ({})", outputFileWords, block.id, block); + WordIndexOffsetsTable wordIndexTable = createWordIndexTable(journalReader, outputFileWords); - logger.info("Creating word index table {} for block {} ({})", outputFileWords, block.id, block); - WordIndexOffsetsTable wordIndexTable = createWordIndexTable(outputFileWords, inputChannel); + logger.info("Creating word urls table {} for block {} ({})", outputFileUrls, block.id, block); + createUrlTable(journalReader, tmpUrlsFile, wordIndexTable); - logger.info("Creating word urls table {} for block {} ({})", outputFileUrls, block.id, block); - createUrlTable(buffer, raf, wordIndexTable); - - Files.delete(tmpUrlsFile); - raf.close(); - - urlsTmpFileChannel.close(); - urlsTmpFileMap.force(); - - } - - private boolean isUrlAllowed(long url) { - return !spamDomains.contains((int)(url >>> 32)); - } - - public long translateUrl(long url) { - int domainId = partitioner.translateId(bucketId, (int) (url >>> 32)); - return ((long)domainId << 32) | (url & 0xFFFFFFFFL); - } - - - private long getUrlsSize(ByteBuffer buffer, FileChannel channel) throws IOException { - channel.position(FILE_HEADER_SIZE); - - var reader = new IndexReader(buffer, channel) { - public long size; - - @Override - public void eachWord(long urlId, int wordId) { - size++; - } - }; - - reader.read(); - - logger.info("Blacklist filtered {} URLs", reader.filtered); - logger.debug("URLs Size {} Mb", channel.position()/(1024*1024)); - - return reader.size; - } - - private void createUrlTable(ByteBuffer buffer, RandomAccessFile raf, WordIndexOffsetsTable wordOffsetsTable) throws IOException { - logger.info("Table size = {}", wordOffsetsTable.length()); - - raf.seek(FILE_HEADER_SIZE); - - var channel = raf.getChannel(); - - try (RandomWriteFunnel rwf = new RandomWriteFunnel(tmpFileDir, urlsFileSize, 10_000_000)) { - int[] wordWriteOffset = new int[wordOffsetsTable.length()]; - - new IndexReader(buffer, channel) { - @Override - public void eachWord(long urlId, int wordId) throws IOException { - if (wordId >= wordWriteOffset.length) - return; - - if (wordId > 0) { - rwf.put(wordOffsetsTable.get(wordId - 1) + wordWriteOffset[wordId]++, translateUrl(urlId)); - } else { - rwf.put(wordWriteOffset[wordId]++, translateUrl(urlId)); - } - } - }.read(); - - rwf.write(urlsTmpFileChannel); + Files.delete(tmpUrlsFile); } - - urlsTmpFileChannel.force(false); - logger.info("URL TMP Table: {} Mb", channel.position()/(1024*1024)); - - if (wordOffsetsTable.length() > 0) { - logger.info("Sorting urls table"); - - wordOffsetsTable.forEach(urlTmpFileSorter::sort); - - urlsTmpFileMap.force(); - } - else { - logger.warn("urls table empty -- nothing to sort"); - } - - logger.info("Writing BTree"); - try (var urlsFileMap = MultimapFileLong.forOutput(urlsFile.toPath(), 1024)) { - var writer = new BTreeWriter(urlsFileMap, urlsBTreeContext); - - wordOffsetsTable.fold((accumulatorIdx, start, length) -> { - // Note: The return value is accumulated into accumulatorIdx! - - return writer.write(accumulatorIdx, length, - slice -> slice.transferFromFileChannel(urlsTmpFileChannel, 0, start, start + length)); - }); - - } catch (Exception e) { - logger.error("Error while writing BTree", e); + finally { + lock.unlock(); } } - private WordIndexOffsetsTable createWordIndexTable(File outputFileWords, FileChannel inputChannel) throws IOException { - inputChannel.position(FILE_HEADER_SIZE); - logger.debug("Table size = {}", wordCount); - WordsTableWriter wordsTableWriter = new WordsTableWriter(wordCount); - ByteBuffer buffer = ByteBuffer.allocateDirect(8* SearchIndexWriterImpl.MAX_BLOCK_SIZE); + + private WordIndexOffsetsTable createWordIndexTable(SearchIndexJournalReader journalReader, + File outputFileWords) throws IOException + { + final int topWord = (int) journalReader.fileHeader.wordCount(); + + logger.debug("Table size = {}", topWord); + WordsTableWriter wordsTableWriter = new WordsTableWriter(topWord); logger.debug("Reading words"); - var reader = new IndexReader(buffer, inputChannel) { - @Override - public void eachWord(long urlId, int wordId) { + for (var entry : journalReader) { + if (!isRelevantEntry(entry)) { + continue; + } + + final SearchIndexJournalEntry entryData = entry.readEntryUsingBuffer(tmpWordsBuffer); + + for (int i = 0; i < entryData.size(); i++) { + int wordId = (int) entryData.get(i); + if (wordId < 0 || wordId >= topWord) { + logger.warn("Bad wordId {}", wordId); + } wordsTableWriter.acceptWord(wordId); } - }; - reader.read(); + } logger.debug("Rearranging table"); - inputChannel.position(FILE_HEADER_SIZE); - wordsTableWriter.write(outputFileWords); return wordsTableWriter.getTable(); } - @RequiredArgsConstructor - private class IndexReader { - private final ByteBuffer buffer; - private final FileChannel channel; - public long filtered; + private void createUrlTable(SearchIndexJournalReader journalReader, + Path tmpUrlsFile, + WordIndexOffsetsTable wordOffsetsTable) throws IOException + { + logger.info("Table size = {}", wordOffsetsTable.length()); - public void read() throws IOException { - var lock = partitioner.getReadLock(); - try { - lock.lock(); - outer: - while (channel.position() < fileLength) { - buffer.clear(); - buffer.limit(CHUNK_HEADER_SIZE); - channel.read(buffer); - buffer.flip(); - long urlId = buffer.getLong(); - int chunkBlock = buffer.getInt(); - int count = buffer.getInt(); + long numberOfWordsTotal = 0; + for (var entry : journalReader) { + if (isRelevantEntry(entry)) + numberOfWordsTotal += entry.wordCount(); + } - if (count > 1000) { - int tries = 0; - logger.warn("Terminating garbage @{}b, attempting repair", channel.position()); + try (RandomAccessFile urlsTmpFileRAF = new RandomAccessFile(tmpUrlsFile.toFile(), "rw"); + FileChannel urlsTmpFileChannel = urlsTmpFileRAF.getChannel()) { - for (; ; ) { - tries++; - long p = channel.position(); - buffer.clear(); - buffer.limit(8); - if (channel.read(buffer) != 8) { - break outer; // EOF...? - } + try (RandomWriteFunnel rwf = new RandomWriteFunnel(tmpFileDir, numberOfWordsTotal, 10_000_000)) { + int[] wordWriteOffset = new int[wordOffsetsTable.length()]; - buffer.flip(); - int pcb = buffer.getInt(); - int pct = buffer.getInt(); - if (pcb == 0 || pcb == 1 && pct >= 0 && pct <= 1000) { - chunkBlock = pcb; - count = pct; - break; - } else { - channel.position(p + 1); - } + for (var entry : journalReader) { + if (!isRelevantEntry(entry)) continue; + + var entryData = entry.readEntryUsingBuffer(tmpWordsBuffer); + + for (int i = 0; i < entryData.size(); i++) { + int wordId = (int) entryData.get(i); + + if (wordId >= wordWriteOffset.length) + continue; + if (wordId < 0) { + logger.warn("Negative wordId {}", wordId); } - logger.warn("Skipped {}b", tries); - } - buffer.clear(); - buffer.limit(count * 4); - - int trb = 0; - while (trb < count * 4) { - int rb = channel.read(buffer); - if (rb <= 0) { - throw new ArrayIndexOutOfBoundsException(trb + " - " + count * 4 + " " + rb); + final long urlInternal = translateUrl(entry.docId()); + if (wordId > 0) { + rwf.put(wordOffsetsTable.get(wordId - 1) + wordWriteOffset[wordId]++, urlInternal); + } else { + rwf.put(wordWriteOffset[wordId]++, urlInternal); } - trb += rb; - } - - buffer.flip(); - - if (isUrlAllowed(urlId)) { - if (block.id == chunkBlock) { - eachUrl(lock, count, urlId); - } - } else { - filtered++; } } - } - finally { - lock.unlock(); - } - } - public void eachUrl(Lock lock, int count, long urlId) throws IOException { - for (int i = 0; i < count; i++) { - int wordId = buffer.getInt(); - if (acceptWord(lock, urlId)) { - eachWord(urlId, wordId); + + rwf.write(urlsTmpFileChannel); + } + + urlsTmpFileChannel.force(false); + + try (var urlsTmpFileMap = MultimapFileLong.forOutput(tmpUrlsFile, numberOfWordsTotal)) { + if (wordOffsetsTable.length() > 0) { + logger.info("Sorting urls table"); + + var urlTmpFileSorter = urlsTmpFileMap.createSorter(tmpFileDir, internalSortLimit); + + wordOffsetsTable.forEachRange(urlTmpFileSorter::sort); + + urlsTmpFileMap.force(); + } else { + logger.warn("urls table empty -- nothing to sort"); } } - } - public void eachWord(long urlId, int wordId) throws IOException { - } + logger.info("Writing BTree"); + try (var urlsFileMap = MultimapFileLong.forOutput(outputFileUrls.toPath(), numberOfWordsTotal)) { + var writer = new BTreeWriter(urlsFileMap, urlsBTreeContext); - boolean acceptWord(Lock lock, long urlId) { - int domainId = (int) (urlId >>> 32L); + wordOffsetsTable.foldRanges((accumulatorIdx, start, length) -> { + // Note: The return value is accumulated into accumulatorIdx! - if (!partitioner.filterUnsafe(lock, domainId, bucketId)) { - return false; + return writer.write(accumulatorIdx, length, + slice -> slice.transferFromFileChannel(urlsTmpFileChannel, 0, start, start + length)); + }); + + } catch (Exception e) { + logger.error("Error while writing BTree", e); } - return true; } } + + + private long translateUrl(long url) { + int domainId = partitioner.translateId(bucketId, (int) (url >>> 32)); + return ((long)domainId << 32) | (url & 0xFFFFFFFFL); + } + + private boolean isRelevantEntry(SearchIndexJournalReader.JournalEntry entry) { + return block.equals(entry.header.block()) + && !blacklist.isBlacklisted(entry.domainId()) + && partitioner.filterUnsafe(entry.domainId(), bucketId); + } + } diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexPartitioner.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexPartitioner.java index bf5a1d74..2f2e9d47 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexPartitioner.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexPartitioner.java @@ -122,7 +122,7 @@ public class SearchIndexPartitioner { public Lock getReadLock() { return rwl.readLock(); } - public boolean filterUnsafe(Lock lock, int domainId, int bucketId) { + public boolean filterUnsafe(int domainId, int bucketId) { return partitionSet.test(domainId, bucketId); } diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexPreconverter.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexPreconverter.java index 9e851025..5357fc1f 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexPreconverter.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/SearchIndexPreconverter.java @@ -3,7 +3,9 @@ package nu.marginalia.wmsa.edge.index.conversion; import com.google.inject.Inject; import gnu.trove.set.hash.TIntHashSet; import lombok.SneakyThrows; +import nu.marginalia.util.multimap.MultimapFileLong; import nu.marginalia.wmsa.edge.data.dao.task.EdgeDomainBlacklist; +import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalReader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,23 +48,16 @@ public class SearchIndexPreconverter { } } - final RandomAccessFile raf = new RandomAccessFile(inputFile, "r"); + SearchIndexJournalReader indexJournalReader = new SearchIndexJournalReader(MultimapFileLong.forReading(inputFile.toPath())); - var fileLength = raf.readLong(); - var wordCount = raf.readInt(); - final int wordCountOriginal = wordCount; + final long wordCountOriginal = indexJournalReader.fileHeader.wordCount(); - logger.info("Word Count: {}", wordCount); - logger.info("File Length: {}", fileLength); - - var channel = raf.getChannel(); - - ByteBuffer inByteBuffer = ByteBuffer.allocateDirect(10_000); + logger.info("{}", indexJournalReader.fileHeader); RandomAccessFile[] randomAccessFiles = new RandomAccessFile[outputFiles.length]; for (int i = 0; i < randomAccessFiles.length; i++) { randomAccessFiles[i] = new RandomAccessFile(outputFiles[i], "rw"); - randomAccessFiles[i].seek(12); + randomAccessFiles[i].seek(SearchIndexJournalReader.FILE_HEADER_SIZE_BYTES); } FileChannel[] fileChannels = new FileChannel[outputFiles.length]; for (int i = 0; i < fileChannels.length; i++) { @@ -73,33 +68,24 @@ public class SearchIndexPreconverter { var lock = partitioner.getReadLock(); try { lock.lock(); + ByteBuffer buffer = ByteBuffer.allocateDirect(8192); - while (channel.position() < fileLength) { - inByteBuffer.clear(); - inByteBuffer.limit(CHUNK_HEADER_SIZE); - channel.read(inByteBuffer); - inByteBuffer.flip(); - long urlId = inByteBuffer.getLong(); - int chunkBlock = inByteBuffer.getInt(); - int count = inByteBuffer.getInt(); - // inByteBuffer.clear(); - inByteBuffer.limit(count * 4 + CHUNK_HEADER_SIZE); - channel.read(inByteBuffer); - inByteBuffer.position(CHUNK_HEADER_SIZE); - - for (int i = 0; i < count; i++) { - wordCount = Math.max(wordCount, 1 + inByteBuffer.getInt()); + for (var entry : indexJournalReader) { + if (!partitioner.isGoodUrl(entry.urlId()) + || spamDomains.contains(entry.domainId())) { + continue; } - inByteBuffer.position(count * 4 + CHUNK_HEADER_SIZE); + int domainId = entry.domainId(); + buffer.clear(); + entry.copyToBuffer(buffer); + for (int i = 0; i < randomAccessFiles.length; i++) { + if (partitioner.filterUnsafe(domainId, i)) { + buffer.flip(); - if (isUrlAllowed(urlId)) { - for (int i = 0; i < randomAccessFiles.length; i++) { - if (partitioner.filterUnsafe(lock, (int) (urlId >>> 32L), i)) { - inByteBuffer.flip(); - fileChannels[i].write(inByteBuffer); - } + while (buffer.position() < buffer.limit()) + fileChannels[i].write(buffer); } } } @@ -108,27 +94,16 @@ public class SearchIndexPreconverter { lock.unlock(); } - if (wordCountOriginal < wordCount) { - logger.warn("Raised word count {} => {}", wordCountOriginal, wordCount); - } - for (int i = 0; i < randomAccessFiles.length; i++) { long pos = randomAccessFiles[i].getFilePointer(); randomAccessFiles[i].seek(0); randomAccessFiles[i].writeLong(pos); - randomAccessFiles[i].writeInt(wordCount); + randomAccessFiles[i].writeLong(wordCountOriginal); fileChannels[i].force(true); fileChannels[i].close(); randomAccessFiles[i].close(); } } - private boolean isUrlAllowed(long url) { - int urlId = (int)(url & 0xFFFF_FFFFL); - int domainId = (int)(url >>> 32); - - return partitioner.isGoodUrl(urlId) && !spamDomains.contains(domainId); - } - } diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/words/WordIndexOffsetsTable.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/words/WordIndexOffsetsTable.java index 29b88509..f1308d6e 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/words/WordIndexOffsetsTable.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/conversion/words/WordIndexOffsetsTable.java @@ -16,7 +16,7 @@ public class WordIndexOffsetsTable { return table.length; } - public void forEach(OffsetTableEntryConsumer o) throws IOException { + public void forEachRange(OffsetTableEntryConsumer o) throws IOException { if (table[0] > 0) { o.accept(0, (int) table[0]); } @@ -32,9 +32,9 @@ public class WordIndexOffsetsTable { } /** - * Fold over each span in the file, left to right + * Fold over each span in the file, left to right, accumulating the return value */ - public long fold(OffsetTableEntryFoldConsumer o) throws IOException { + public long foldRanges(OffsetTableEntryFoldConsumer o) throws IOException { long total = 0; if (table[0] > 0) { diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalEntry.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalEntry.java new file mode 100644 index 00000000..493eea40 --- /dev/null +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalEntry.java @@ -0,0 +1,49 @@ +package nu.marginalia.wmsa.edge.index.journal; + +import java.nio.ByteBuffer; +import java.util.Arrays; + +public class SearchIndexJournalEntry { + private final int size; + private final long[] underlyingArray; + + public static final int MAX_LENGTH = 1000; + + public SearchIndexJournalEntry(long[] underlyingArray) { + this.size = underlyingArray.length; + this.underlyingArray = underlyingArray; + } + + public SearchIndexJournalEntry(int size, long[] underlyingArray) { + this.size = size; + this.underlyingArray = underlyingArray; + } + + public void write(ByteBuffer buffer) { + for (int i = 0; i < size; i++) { + buffer.putLong(underlyingArray[i]); + } + } + + public long get(int idx) { + if (idx >= size) + throw new ArrayIndexOutOfBoundsException(); + return underlyingArray[idx]; + } + + public int size() { + return size; + } + + public long[] toArray() { + if (size == underlyingArray.length) + return underlyingArray; + else + return Arrays.copyOf(underlyingArray, size); + } + + public String toString() { + return String.format("%s[%s]", getClass().getSimpleName(), Arrays.toString(toArray())); + } + +} diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalEntryHeader.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalEntryHeader.java new file mode 100644 index 00000000..f635b1d4 --- /dev/null +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalEntryHeader.java @@ -0,0 +1,16 @@ +package nu.marginalia.wmsa.edge.index.journal; + +import nu.marginalia.wmsa.edge.index.model.IndexBlock; +import nu.marginalia.wmsa.edge.model.EdgeDomain; +import nu.marginalia.wmsa.edge.model.EdgeId; +import nu.marginalia.wmsa.edge.model.EdgeUrl; + +public record SearchIndexJournalEntryHeader(int entrySize, long documentId, IndexBlock block) { + + public static final int HEADER_SIZE_LONGS = 2; + + public SearchIndexJournalEntryHeader( EdgeId domainId, EdgeId urlId, IndexBlock block) { + this(-1, (long) domainId.id() << 32 | urlId.id(), block); + } + +} diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalFileHeader.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalFileHeader.java new file mode 100644 index 00000000..49ac5009 --- /dev/null +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalFileHeader.java @@ -0,0 +1,4 @@ +package nu.marginalia.wmsa.edge.index.journal; + +public record SearchIndexJournalFileHeader(long fileSize, long wordCount) { +} diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalReader.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalReader.java new file mode 100644 index 00000000..0e11646a --- /dev/null +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalReader.java @@ -0,0 +1,123 @@ +package nu.marginalia.wmsa.edge.index.journal; + +import com.upserve.uppend.blobs.NativeIO; +import nu.marginalia.util.multimap.MultimapFileLong; +import nu.marginalia.util.multimap.MultimapFileLongSlice; +import nu.marginalia.wmsa.edge.index.model.IndexBlock; +import org.jetbrains.annotations.NotNull; + +import java.nio.ByteBuffer; +import java.util.Iterator; + +import static nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalEntryHeader.HEADER_SIZE_LONGS; + +public class SearchIndexJournalReader implements Iterable { + public static final long FILE_HEADER_SIZE_LONGS = 2; + public static final long FILE_HEADER_SIZE_BYTES = 8*FILE_HEADER_SIZE_LONGS; + + public final SearchIndexJournalFileHeader fileHeader; + + private final MultimapFileLongSlice map; + private final long committedSize; + + public SearchIndexJournalReader(MultimapFileLong map) { + fileHeader = new SearchIndexJournalFileHeader(map.get(0), map.get(1)); + committedSize = map.get(0) / 8 - FILE_HEADER_SIZE_LONGS; + + map.advice(NativeIO.Advice.Sequential); + + this.map = map.atOffset(FILE_HEADER_SIZE_LONGS); + } + + @NotNull + @Override + public Iterator iterator() { + return new JournalEntryIterator(); + } + + private class JournalEntryIterator implements Iterator { + private JournalEntry entry; + + @Override + public boolean hasNext() { + if (entry == null) { + return committedSize > 0; + } + + return entry.hasNext(); + } + + @Override + public JournalEntry next() { + if (entry == null) { + entry = new JournalEntry(0); + } + else { + entry = entry.next(); + } + return entry; + } + } + + public class JournalEntry { + private final long offset; + public final SearchIndexJournalEntryHeader header; + + JournalEntry(long offset) { + final long sizeBlock = map.get(offset); + final long docId = map.get(offset + 1); + + this.offset = offset; + this.header = new SearchIndexJournalEntryHeader( + (int)(sizeBlock >>> 32L), + docId, + IndexBlock.byId((int)(sizeBlock & 0xFFFF_FFFFL))); + } + + public boolean hasNext() { + return nextId() < committedSize; + } + public long docId() { + return header.documentId(); + } + public int domainId() { + return (int) (docId() >>> 32L); + } + public int urlId() { + return (int)(docId() & 0xFFFF_FFFFL); + } + public IndexBlock block() { + return header.block(); + } + public int wordCount() { return header.entrySize(); } + + public SearchIndexJournalEntry readEntry() { + long[] dest = new long[header.entrySize()]; + map.read(dest, offset + HEADER_SIZE_LONGS); + return new SearchIndexJournalEntry(header.entrySize(), dest); + } + + public SearchIndexJournalEntry readEntryUsingBuffer(long[] dest) { + if (dest.length >= header.entrySize()) { + map.read(dest, header.entrySize(), offset + HEADER_SIZE_LONGS); + return new SearchIndexJournalEntry(header.entrySize(), dest); + } + else { + return readEntry(); + } + } + + public long nextId() { + return offset + HEADER_SIZE_LONGS + header.entrySize(); + } + public JournalEntry next() { return new JournalEntry(nextId()); } + + public void copyToBuffer(ByteBuffer buffer) { + var dest = buffer.asLongBuffer(); + dest.position(buffer.position() * 8); + dest.limit(buffer.position()*8 + header.entrySize() + HEADER_SIZE_LONGS); + map.read(dest, offset); + buffer.position(dest.limit()*8); + } + } +} diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalWriter.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalWriter.java new file mode 100644 index 00000000..4567a428 --- /dev/null +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalWriter.java @@ -0,0 +1,10 @@ +package nu.marginalia.wmsa.edge.index.journal; + +public interface SearchIndexJournalWriter { + void put(SearchIndexJournalEntryHeader header, SearchIndexJournalEntry entry); + + void forceWrite(); + + void flushWords(); + +} diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexWriterImpl.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalWriterImpl.java similarity index 68% rename from marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexWriterImpl.java rename to marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalWriterImpl.java index cf76ada2..f5ba8b31 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexWriterImpl.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexJournalWriterImpl.java @@ -3,11 +3,7 @@ package nu.marginalia.wmsa.edge.index.journal; import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.schedulers.Schedulers; import lombok.SneakyThrows; -import nu.marginalia.wmsa.edge.index.model.IndexBlock; import nu.marginalia.wmsa.edge.index.dictionary.DictionaryWriter; -import nu.marginalia.wmsa.edge.model.EdgeDomain; -import nu.marginalia.wmsa.edge.model.EdgeId; -import nu.marginalia.wmsa.edge.model.EdgeUrl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -17,10 +13,9 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; -import java.util.List; import java.util.concurrent.TimeUnit; -public class SearchIndexWriterImpl implements SearchIndexWriter { +public class SearchIndexJournalWriterImpl implements SearchIndexJournalWriter { private final DictionaryWriter dictionaryWriter; private final Logger logger = LoggerFactory.getLogger(getClass()); @@ -28,12 +23,12 @@ public class SearchIndexWriterImpl implements SearchIndexWriter { private RandomAccessFile raf; private FileChannel channel; - public static final int MAX_BLOCK_SIZE = 1000*32*8*4; + public static final int MAX_BLOCK_SIZE = SearchIndexJournalEntry.MAX_LENGTH*32*8*4; private final ByteBuffer byteBuffer; private long pos; @SneakyThrows - public SearchIndexWriterImpl(DictionaryWriter dictionaryWriter, File indexFile) { + public SearchIndexJournalWriterImpl(DictionaryWriter dictionaryWriter, File indexFile) { this.dictionaryWriter = dictionaryWriter; initializeIndexFile(indexFile); @@ -61,23 +56,16 @@ public class SearchIndexWriterImpl implements SearchIndexWriter { @Override @SneakyThrows - public synchronized void put(EdgeId domainId, EdgeId urlId, IndexBlock block, List wordsSuspect) { - int numGoodWords = 0; - for (String word : wordsSuspect) { - if (word.length() < Byte.MAX_VALUE) numGoodWords++; - } + public synchronized void put(SearchIndexJournalEntryHeader header, SearchIndexJournalEntry entryData) { byteBuffer.clear(); - long url_id = ((long) domainId.getId() << 32) | urlId.getId(); - byteBuffer.putLong(url_id); - byteBuffer.putInt(block.id); - byteBuffer.putInt(numGoodWords); - for (String word : wordsSuspect) { - if (word.length() < Byte.MAX_VALUE) { - byteBuffer.putInt(dictionaryWriter.get(word)); - } - } + byteBuffer.putInt(entryData.size()); + byteBuffer.putInt(header.block().id); + byteBuffer.putLong(header.documentId()); + + entryData.write(byteBuffer); + byteBuffer.limit(byteBuffer.position()); byteBuffer.rewind(); @@ -104,11 +92,11 @@ public class SearchIndexWriterImpl implements SearchIndexWriter { } private void writePositionMarker() throws IOException { - var lock = channel.lock(0, 12, false); + var lock = channel.lock(0, 16, false); pos = channel.size(); raf.seek(0); raf.writeLong(pos); - raf.writeInt(dictionaryWriter.size()); + raf.writeLong(dictionaryWriter.size()); raf.seek(pos); lock.release(); } diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexWriter.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexWriter.java deleted file mode 100644 index 11fc186a..00000000 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/journal/SearchIndexWriter.java +++ /dev/null @@ -1,16 +0,0 @@ -package nu.marginalia.wmsa.edge.index.journal; - -import nu.marginalia.wmsa.edge.index.model.IndexBlock; -import nu.marginalia.wmsa.edge.model.EdgeDomain; -import nu.marginalia.wmsa.edge.model.EdgeId; -import nu.marginalia.wmsa.edge.model.EdgeUrl; - -import java.util.List; - -public interface SearchIndexWriter { - void put(EdgeId domainId, EdgeId urlId, IndexBlock block, List words); - void forceWrite(); - - void flushWords(); - -} diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/reader/SearchIndexes.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/reader/SearchIndexes.java index 863c0c65..01ad1e20 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/reader/SearchIndexes.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/index/reader/SearchIndexes.java @@ -7,7 +7,7 @@ import nu.marginalia.wmsa.edge.index.IndexServicesFactory; import nu.marginalia.wmsa.edge.index.conversion.SearchIndexPartitioner; import nu.marginalia.wmsa.edge.index.EdgeIndexBucket; import nu.marginalia.wmsa.edge.index.dictionary.DictionaryReader; -import nu.marginalia.wmsa.edge.index.journal.SearchIndexWriterImpl; +import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalWriterImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -27,8 +27,8 @@ public class SearchIndexes { private final ReentrantLock opsLock = new ReentrantLock(false); - private final SearchIndexWriterImpl primaryIndexWriter; - private final SearchIndexWriterImpl secondaryIndexWriter; + private final SearchIndexJournalWriterImpl primaryIndexWriter; + private final SearchIndexJournalWriterImpl secondaryIndexWriter; private DictionaryReader dictionaryReader = null; @Inject @@ -134,7 +134,7 @@ public class SearchIndexes { } } - public SearchIndexWriterImpl getIndexWriter(int idx) { + public SearchIndexJournalWriterImpl getIndexWriter(int idx) { if (idx == 0) { return primaryIndexWriter; } diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/model/EdgeId.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/model/EdgeId.java index f2be15fa..0ee908ef 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/model/EdgeId.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/model/EdgeId.java @@ -1,15 +1,10 @@ package nu.marginalia.wmsa.edge.model; -import lombok.AllArgsConstructor; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.ToString; -/** This exists entirely for strengthening the typing of IDs +/** + * This exists entirely for strengthening the typing of IDs * * @param */ -@AllArgsConstructor @Getter @EqualsAndHashCode @ToString -public class EdgeId { - private final int id; +public record EdgeId(int id) { } diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/model/search/EdgeSearchResultItem.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/model/search/EdgeSearchResultItem.java index c6f4fbc5..66438279 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/model/search/EdgeSearchResultItem.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/model/search/EdgeSearchResultItem.java @@ -32,7 +32,7 @@ public class EdgeSearchResultItem { } public long getCombinedId() { - return ((long) domain.getId() << 32L) | url.getId(); + return ((long) domain.id() << 32L) | url.id(); } } diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/EdgeSearchOperator.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/EdgeSearchOperator.java index 66004dee..add46ef4 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/EdgeSearchOperator.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/EdgeSearchOperator.java @@ -121,7 +121,7 @@ public class EdgeSearchOperator { int domainId = -1; try { if (domain != null) { - return edgeDataStoreDao.getDomainId(new EdgeDomain(domain)).getId(); + return edgeDataStoreDao.getDomainId(new EdgeDomain(domain)).id(); } } catch (NoSuchElementException ex) { diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/command/commands/SiteSearchCommand.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/command/commands/SiteSearchCommand.java index 6e341721..193f1a1c 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/command/commands/SiteSearchCommand.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/command/commands/SiteSearchCommand.java @@ -3,7 +3,6 @@ package nu.marginalia.wmsa.edge.search.command.commands; import com.google.inject.Inject; import nu.marginalia.wmsa.configuration.server.Context; import nu.marginalia.wmsa.edge.data.dao.EdgeDataStoreDao; -import nu.marginalia.wmsa.edge.data.dao.task.EdgeDomainBlacklist; import nu.marginalia.wmsa.edge.index.model.IndexBlock; import nu.marginalia.wmsa.edge.model.crawl.EdgeDomainIndexingState; import nu.marginalia.wmsa.edge.search.EdgeSearchOperator; @@ -12,7 +11,6 @@ import nu.marginalia.wmsa.edge.search.command.ResponseType; import nu.marginalia.wmsa.edge.search.command.SearchCommandInterface; import nu.marginalia.wmsa.edge.search.command.SearchParameters; import nu.marginalia.wmsa.edge.search.model.DecoratedSearchResultSet; -import nu.marginalia.wmsa.edge.search.model.DecoratedSearchResults; import nu.marginalia.wmsa.edge.search.model.DomainInformation; import nu.marginalia.wmsa.edge.search.siteinfo.DomainInformationService; import nu.marginalia.wmsa.renderer.mustache.MustacheRenderer; @@ -69,7 +67,7 @@ public class SiteSearchCommand implements SearchCommandInterface { if (null != domain) { resultSet = searchOperator.performDumbQuery(ctx, EdgeSearchProfile.CORPO, IndexBlock.Words, 100, 100, "site:"+domain); - screenshotPath = Path.of("/screenshot/" + dataStoreDao.getDomainId(domain).getId()); + screenshotPath = Path.of("/screenshot/" + dataStoreDao.getDomainId(domain).id()); } else { resultSet = new DecoratedSearchResultSet(Collections.emptyList()); diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/results/SearchResultDecorator.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/results/SearchResultDecorator.java index 22b24aca..12d358bf 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/results/SearchResultDecorator.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/results/SearchResultDecorator.java @@ -78,8 +78,8 @@ public class SearchResultDecorator { TIntArrayList missedIds = new TIntArrayList(); for (var resultItem : resultItems) { - var did = resultItem.getDomain().getId(); - var uid = resultItem.getUrl().getId(); + var did = resultItem.getDomain().id(); + var uid = resultItem.getUrl().id(); var details = detailsById.get(uid); if (details == null) { diff --git a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/siteinfo/DomainInformationService.java b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/siteinfo/DomainInformationService.java index 2f79a9ea..d3eb8061 100644 --- a/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/siteinfo/DomainInformationService.java +++ b/marginalia_nu/src/main/java/nu/marginalia/wmsa/edge/search/siteinfo/DomainInformationService.java @@ -2,7 +2,6 @@ package nu.marginalia.wmsa.edge.search.siteinfo; import com.zaxxer.hikari.HikariDataSource; import lombok.SneakyThrows; -import nu.marginalia.wmsa.edge.data.dao.EdgeDataStoreDao; import nu.marginalia.wmsa.edge.data.dao.EdgeDataStoreDaoImpl; import nu.marginalia.wmsa.edge.model.EdgeDomain; import nu.marginalia.wmsa.edge.model.EdgeId; @@ -98,7 +97,7 @@ public class DomainInformationService { try (var connection = dataSource.getConnection()) { try (var stmt = connection.prepareStatement("SELECT KNOWN_URLS FROM DOMAIN_METADATA WHERE ID=?")) { - stmt.setInt(1, domainId.getId()); + stmt.setInt(1, domainId.id()); var rsp = stmt.executeQuery(); if (rsp.next()) { return rsp.getInt(1); @@ -115,7 +114,7 @@ public class DomainInformationService { try (var connection = dataSource.getConnection()) { try (var stmt = connection.prepareStatement("SELECT VISITED_URLS FROM DOMAIN_METADATA WHERE ID=?")) { - stmt.setInt(1, domainId.getId()); + stmt.setInt(1, domainId.id()); var rsp = stmt.executeQuery(); if (rsp.next()) { return rsp.getInt(1); @@ -133,7 +132,7 @@ public class DomainInformationService { try (var connection = dataSource.getConnection()) { try (var stmt = connection.prepareStatement("SELECT GOOD_URLS FROM DOMAIN_METADATA WHERE ID=?")) { - stmt.setInt(1, domainId.getId()); + stmt.setInt(1, domainId.id()); var rsp = stmt.executeQuery(); if (rsp.next()) { return rsp.getInt(1); @@ -150,7 +149,7 @@ public class DomainInformationService { try (var connection = dataSource.getConnection()) { try (var stmt = connection.prepareStatement("SELECT COUNT(ID) FROM EC_DOMAIN_LINK WHERE DEST_DOMAIN_ID=?")) { - stmt.setInt(1, domainId.getId()); + stmt.setInt(1, domainId.id()); var rsp = stmt.executeQuery(); if (rsp.next()) { return rsp.getInt(1); @@ -166,7 +165,7 @@ public class DomainInformationService { try (var connection = dataSource.getConnection()) { try (var stmt = connection.prepareStatement("SELECT COUNT(ID) FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=?")) { - stmt.setInt(1, domainId.getId()); + stmt.setInt(1, domainId.id()); var rsp = stmt.executeQuery(); if (rsp.next()) { return rsp.getInt(1); @@ -183,7 +182,7 @@ public class DomainInformationService { try (var connection = dataSource.getConnection()) { try (var stmt = connection.prepareStatement("SELECT QUALITY FROM EC_DOMAIN WHERE ID=?")) { - stmt.setInt(1, domainId.getId()); + stmt.setInt(1, domainId.id()); var rsp = stmt.executeQuery(); if (rsp.next()) { return rsp.getDouble(1); @@ -199,7 +198,7 @@ public class DomainInformationService { try (var connection = dataSource.getConnection()) { try (var stmt = connection.prepareStatement("SELECT STATE FROM EC_DOMAIN WHERE ID=?")) { - stmt.setInt(1, domainId.getId()); + stmt.setInt(1, domainId.id()); var rsp = stmt.executeQuery(); if (rsp.next()) { return EdgeDomainIndexingState.valueOf(rsp.getString(1)); @@ -216,8 +215,8 @@ public class DomainInformationService { public List getLinkingDomains(EdgeId domainId) { try (var connection = dataSource.getConnection()) { List results = new ArrayList<>(25); - try (var stmt = connection.prepareStatement("SELECT SOURCE_URL FROM EC_RELATED_LINKS_VIEW WHERE DEST_DOMAIN_ID=? ORDER BY SOURCE_DOMAIN_ID LIMIT 25")) { - stmt.setInt(1, domainId.getId()); + try (var stmt = connection.prepareStatement("SELECT SOURCE_DOMAIN FROM EC_RELATED_LINKS_VIEW WHERE DEST_DOMAIN_ID=? ORDER BY SOURCE_DOMAIN_ID LIMIT 25")) { + stmt.setInt(1, domainId.id()); var rsp = stmt.executeQuery(); while (rsp.next()) { results.add(new EdgeDomain(rsp.getString(1))); @@ -237,7 +236,7 @@ public class DomainInformationService { try (var connection = dataSource.getConnection()) { try (var stmt = connection.prepareStatement("SELECT IFNULL(RANK, 1) FROM EC_DOMAIN WHERE ID=?")) { - stmt.setInt(1, domainId.getId()); + stmt.setInt(1, domainId.id()); var rsp = stmt.executeQuery(); if (rsp.next()) { return rsp.getDouble(1); diff --git a/marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/EdgeIndexClientTest.java b/marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/EdgeIndexClientTest.java index 2b2da0fd..55015d13 100644 --- a/marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/EdgeIndexClientTest.java +++ b/marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/EdgeIndexClientTest.java @@ -81,7 +81,8 @@ public class EdgeIndexClientTest { service = new EdgeIndexService("127.0.0.1", testPort, init, null, - indexes); + indexes, + servicesFactory); Spark.awaitInitialization(); init.setReady(); @@ -113,7 +114,7 @@ public class EdgeIndexClientTest { indexes.reindexAll(); var rsp = client.query(Context.internal(), EdgeSearchSpecification.justIncludes("trapphus")); System.out.println(rsp); - assertEquals(5, rsp.resultsList.get(IndexBlock.Title).get(0).results.get(0).get(0).url.getId()); + assertEquals(5, rsp.resultsList.get(IndexBlock.Title).get(0).results.get(0).get(0).url.id()); } diff --git a/marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/SearchIndexJournalWriterTest.java b/marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/SearchIndexJournalWriterTest.java new file mode 100644 index 00000000..39a62033 --- /dev/null +++ b/marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/SearchIndexJournalWriterTest.java @@ -0,0 +1,76 @@ +package nu.marginalia.wmsa.edge.index.service; + +import lombok.SneakyThrows; +import nu.marginalia.util.multimap.MultimapFileLong; +import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalEntry; +import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalEntryHeader; +import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalReader; +import nu.marginalia.wmsa.edge.index.model.IndexBlock; +import nu.marginalia.wmsa.edge.index.dictionary.DictionaryWriter; +import nu.marginalia.wmsa.edge.index.reader.SearchIndexReader; +import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalWriterImpl; +import nu.marginalia.wmsa.edge.index.reader.query.IndexSearchBudget; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; + +class SearchIndexJournalWriterTest { + DictionaryWriter dictionaryWriter; + SearchIndexJournalWriterImpl writer; + + Path indexFile; + Path wordsFile1; + Path urlsFile1; + Path dictionaryFile; + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + @BeforeEach @SneakyThrows + void setUp() { + dictionaryFile = Files.createTempFile("tmp", ".dict"); + dictionaryFile.toFile().deleteOnExit(); + + dictionaryWriter = new DictionaryWriter(dictionaryFile.toFile(), 1L<<16, false); + + indexFile = Files.createTempFile("tmp", ".idx"); + indexFile.toFile().deleteOnExit(); + writer = new SearchIndexJournalWriterImpl(dictionaryWriter, indexFile.toFile()); + + wordsFile1 = Files.createTempFile("words1", ".idx"); + urlsFile1 = Files.createTempFile("urls1", ".idx"); + } + + @SneakyThrows + @AfterEach + void tearDown() { + dictionaryWriter.close(); + writer.close(); + indexFile.toFile().delete(); + dictionaryFile.toFile().delete(); + urlsFile1.toFile().delete(); + wordsFile1.toFile().delete(); + } + + @Test + void put() throws IOException { + writer.put(new SearchIndexJournalEntryHeader(4, (1234L << 32) | 5678, IndexBlock.Link), + new SearchIndexJournalEntry(new long[] { 1, 2, 3, 4 })); + writer.put(new SearchIndexJournalEntryHeader(4, (2345L << 32) | 2244, IndexBlock.Words), + new SearchIndexJournalEntry(new long[] { 5, 6, 7 })); + writer.forceWrite(); + + var reader = new SearchIndexJournalReader(MultimapFileLong.forReading(indexFile)); + reader.forEach(entry -> { + logger.info("{}, {} {}", entry, entry.urlId(), entry.domainId()); + logger.info("{}", entry.readEntry().toArray()); + }); + } + +} \ No newline at end of file diff --git a/marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/SearchIndexWriterTest.java b/marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/SearchIndexWriterTest.java deleted file mode 100644 index edcfa71f..00000000 --- a/marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/SearchIndexWriterTest.java +++ /dev/null @@ -1,90 +0,0 @@ -package nu.marginalia.wmsa.edge.index.service; - -import lombok.SneakyThrows; -import nu.marginalia.wmsa.edge.index.conversion.SearchIndexPartitioner; -import nu.marginalia.wmsa.edge.index.model.IndexBlock; -import nu.marginalia.wmsa.edge.index.dictionary.DictionaryWriter; -import nu.marginalia.wmsa.edge.index.reader.SearchIndex; -import nu.marginalia.wmsa.edge.index.conversion.SearchIndexConverter; -import nu.marginalia.wmsa.edge.index.reader.SearchIndexReader; -import nu.marginalia.wmsa.edge.index.journal.SearchIndexWriterImpl; -import nu.marginalia.wmsa.edge.index.reader.query.IndexSearchBudget; -import nu.marginalia.wmsa.edge.model.EdgeId; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.EnumMap; - -import static nu.marginalia.util.dict.DictionaryHashMap.NO_VALUE; -import static org.junit.jupiter.api.Assertions.*; - -class SearchIndexWriterTest { - DictionaryWriter dictionaryWriter; - SearchIndexWriterImpl writer; - - Path indexFile; - Path wordsFile1; - Path urlsFile1; - Path dictionaryFile; - - @BeforeEach @SneakyThrows - void setUp() { - dictionaryFile = Files.createTempFile("tmp", ".dict"); - dictionaryFile.toFile().deleteOnExit(); - - dictionaryWriter = new DictionaryWriter(dictionaryFile.toFile(), 1L<<16, false); - - indexFile = Files.createTempFile("tmp", ".idx"); - indexFile.toFile().deleteOnExit(); - writer = new SearchIndexWriterImpl(dictionaryWriter, indexFile.toFile()); - - wordsFile1 = Files.createTempFile("words1", ".idx"); - urlsFile1 = Files.createTempFile("urls1", ".idx"); - } - - @SneakyThrows - @AfterEach - void tearDown() { - dictionaryWriter.close(); - writer.close(); - indexFile.toFile().delete(); - dictionaryFile.toFile().delete(); - urlsFile1.toFile().delete(); - wordsFile1.toFile().delete(); - } - - public long[] findWord(SearchIndexReader reader, String word, IndexBlock block) { - IndexSearchBudget budget = new IndexSearchBudget(100); - return reader.findWord(block, budget, lv->true, dictionaryWriter.getReadOnly(word)).stream().toArray(); - } - - @Test @SneakyThrows - void put() throws IOException { - writer.put(new EdgeId<>(0), new EdgeId<>(1), IndexBlock.Words, Arrays.asList("Hello", "Salvete", "everyone!", "This", "is", "Bob")); - writer.put(new EdgeId<>(0), new EdgeId<>(2), IndexBlock.Words, Arrays.asList("Salvete", "omnes!", "Bob", "sum", "Hello")); - writer.forceWrite(); - - new SearchIndexConverter(IndexBlock.Words, 0, Path.of("/tmp"), indexFile.toFile(), wordsFile1.toFile(), urlsFile1.toFile(), new SearchIndexPartitioner(null), val -> false); - - EnumMap indices = new EnumMap(IndexBlock.class); - indices.put(IndexBlock.Words, new SearchIndex("0", urlsFile1.toFile(), wordsFile1.toFile())); - - var reader = new SearchIndexReader(indices); - - int bobId = dictionaryWriter.getReadOnly("Bob"); - assertNotEquals(NO_VALUE, bobId); - - assertEquals(2, reader.numHits(IndexBlock.Words, bobId)); - assertArrayEquals(new long[] { 1, 2 }, findWord(reader,"Bob", IndexBlock.Words)); - assertArrayEquals(new long[] { 2 }, findWord(reader,"sum", IndexBlock.Words)); - assertArrayEquals(new long[] { }, findWord(reader,"New Word", IndexBlock.Words)); - - writer.close(); - } - -} \ No newline at end of file diff --git a/marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/util/RandomWriteFunnelTest.java b/marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/util/RandomWriteFunnelTest.java index 1780b6bb..8e58b117 100644 --- a/marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/util/RandomWriteFunnelTest.java +++ b/marginalia_nu/src/test/java/nu/marginalia/wmsa/edge/index/service/util/RandomWriteFunnelTest.java @@ -67,4 +67,37 @@ class RandomWriteFunnelTest { } } } + + + @Test + public void testYuge() { + new File("/tmp/test.bin").delete(); + for (int j = 1; j <= 20; j++) { + try (var funnel = new RandomWriteFunnel(Path.of("/tmp"), 10, j); + var out = new RandomAccessFile("/tmp/test.bin", "rw")) { + for (int i = 10 - 1; i >= 0; i -= 2) { + funnel.put(i, Long.MAX_VALUE - i); + } + funnel.write(out.getChannel()); + + } catch (Exception e) { + e.printStackTrace(); + } + + try (var in = new RandomAccessFile("/tmp/test.bin", "r")) { + in.readLong(); + in.readLong(); + in.readLong(); + in.readLong(); + in.readLong(); + in.readLong(); + in.readLong(); + in.readLong(); + in.readLong(); + in.readLong(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } } \ No newline at end of file diff --git a/third_party/src/main/java/com/upserve/uppend/blobs/NativeIO.java b/third_party/src/main/java/com/upserve/uppend/blobs/NativeIO.java index 80e05c64..0698c5c3 100644 --- a/third_party/src/main/java/com/upserve/uppend/blobs/NativeIO.java +++ b/third_party/src/main/java/com/upserve/uppend/blobs/NativeIO.java @@ -3,18 +3,15 @@ package com.upserve.uppend.blobs; import jnr.ffi.*; import jnr.ffi.types.size_t; -import org.slf4j.Logger; import com.kenai.jffi.MemoryIO; import java.io.IOException; -import java.lang.invoke.MethodHandles; import java.nio.*; // https://github.com/upserve/uppend/blob/70967c6f24d7f1a3bbc18799f485d981da93f53b/src/main/java/com/upserve/uppend/blobs/NativeIO.java // MIT License public class NativeIO { - private static final Logger log = org.slf4j.LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static final NativeC nativeC = LibraryLoader.create(NativeC.class).load("c"); public static final int pageSize = nativeC.getpagesize(); // 4096 on most Linux