Refactoring conversion

This commit is contained in:
vlofgren 2022-06-18 15:54:58 +02:00
parent 63bdc28f79
commit f76af4ca79
37 changed files with 658 additions and 565 deletions

View File

@ -0,0 +1,31 @@
package nu.marginalia.util;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
public class ListChunker {
/** Chops data into a list of lists of max length size
*
* Caveat: Relies on subList and does not clone "data", so
* changes to the original list may affect the sub-lists
* in unspecified ways
*
* @see List#subList
*/
public static <T> List<List<T>> chopList(List<T> data, int size) {
if (data.isEmpty())
return Collections.emptyList();
else if (data.size() < size)
return List.of(data);
final List<List<T>> ret = new ArrayList<>(1 + data.size() / size);
for (int i = 0; i < data.size(); i+=size) {
ret.add(data.subList(i, Math.min(data.size(), i+size)));
}
return ret;
}
}

View File

@ -1,6 +1,6 @@
package nu.marginalia.util; package nu.marginalia.util;
import io.prometheus.client.Gauge; import lombok.SneakyThrows;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -18,10 +18,6 @@ import java.nio.file.Path;
* */ * */
public class RandomWriteFunnel implements AutoCloseable { public class RandomWriteFunnel implements AutoCloseable {
private final static Gauge write_rate = Gauge.build("wmsa_rwf_write_bytes", "Bytes/s")
.register();
private final static Gauge transfer_rate = Gauge.build("wmsa_rwf_transfer_bytes", "Bytes/s")
.register();
private static final Logger logger = LoggerFactory.getLogger(RandomWriteFunnel.class); private static final Logger logger = LoggerFactory.getLogger(RandomWriteFunnel.class);
private final DataBin[] bins; private final DataBin[] bins;
@ -34,7 +30,7 @@ public class RandomWriteFunnel implements AutoCloseable {
int binCount = (int) (size / binSize + ((size % binSize) != 0L ? 1 : 0)); int binCount = (int) (size / binSize + ((size % binSize) != 0L ? 1 : 0));
bins = new DataBin[binCount]; bins = new DataBin[binCount];
for (int i = 0; i < binCount; i++) { for (int i = 0; i < binCount; i++) {
bins[i] = new DataBin(tempDir, (int) Math.min(size - binSize * i, binSize)); bins[i] = new DataBin(tempDir, Math.min((int) (size - binSize * i), binSize));
} }
} }
else { else {
@ -42,25 +38,25 @@ public class RandomWriteFunnel implements AutoCloseable {
} }
} }
public void put(long address, long data) throws IOException { @SneakyThrows
bins[((int)(address / binSize))].put((int)(address%binSize), data); public void put(long address, long data) {
int bin = (int)(address / binSize);
int offset = (int)(address%binSize);
bins[bin].put(offset, data);
} }
public void write(FileChannel o) throws IOException { public void write(FileChannel o) throws IOException {
ByteBuffer buffer = ByteBuffer.allocateDirect(binSize*8); ByteBuffer buffer = ByteBuffer.allocateDirect(binSize*8);
logger.debug("Writing from RWF");
for (int i = 0; i < bins.length; i++) { for (var bin : bins) {
var bin = bins[i];
buffer.clear(); buffer.clear();
bin.eval(buffer); bin.eval(buffer);
while (buffer.hasRemaining()) { while (buffer.hasRemaining()) {
int wb = o.write(buffer); o.write(buffer);
write_rate.set(wb);
} }
} }
logger.debug("Done");
} }
@Override @Override
@ -84,12 +80,12 @@ public class RandomWriteFunnel implements AutoCloseable {
} }
void put(int address, long data) throws IOException { void put(int address, long data) throws IOException {
buffer.putInt(address); if (buffer.remaining() < 12) {
buffer.putLong(data);
if (buffer.capacity() - buffer.position() < 12) {
flushBuffer(); flushBuffer();
} }
buffer.putInt(address);
buffer.putLong(data);
} }
private void flushBuffer() throws IOException { private void flushBuffer() throws IOException {
@ -97,12 +93,15 @@ public class RandomWriteFunnel implements AutoCloseable {
return; return;
buffer.flip(); buffer.flip();
while (channel.write(buffer) > 0); while (buffer.hasRemaining())
channel.write(buffer);
buffer.clear(); buffer.clear();
} }
private void eval(ByteBuffer dest) throws IOException { private void eval(ByteBuffer dest) throws IOException {
flushBuffer(); flushBuffer();
channel.force(false);
channel.position(0); channel.position(0);
buffer.clear(); buffer.clear();
@ -117,14 +116,17 @@ public class RandomWriteFunnel implements AutoCloseable {
if (rb < 0) { if (rb < 0) {
break; break;
} }
else {
transfer_rate.set(rb);
}
buffer.flip(); buffer.flip();
while (buffer.limit() - buffer.position() >= 12) { while (buffer.limit() - buffer.position() >= 12) {
int addr = buffer.getInt(); int addr = 8 * buffer.getInt();
long data = buffer.getLong(); long data = buffer.getLong();
dest.putLong(8*addr, data);
try {
dest.putLong(addr, data);
}
catch (IndexOutOfBoundsException ex) {
logger.info("!!!bad[{}]={}", addr, data);
}
} }
buffer.compact(); buffer.compact();
} }

View File

@ -36,9 +36,7 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice {
private long mappedSize; private long mappedSize;
final static long WORD_SIZE = 8; final static long WORD_SIZE = 8;
private boolean loadAggressively; private NativeIO.Advice defaultAdvice = null;
private final NativeIO.Advice advice = null;
public static MultimapFileLong forReading(Path file) throws IOException { public static MultimapFileLong forReading(Path file) throws IOException {
long fileSize = Files.size(file); long fileSize = Files.size(file);
@ -70,12 +68,7 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice {
long mapSize, long mapSize,
int bufferSize) throws IOException { int bufferSize) throws IOException {
this(new RandomAccessFile(file, translateToRAFMode(mode)), mode, mapSize, bufferSize, false); this(new RandomAccessFile(file, translateToRAFMode(mode)), mode, mapSize, bufferSize);
}
public MultimapFileLong loadAggressively(boolean v) {
this.loadAggressively = v;
return this;
} }
private static String translateToRAFMode(FileChannel.MapMode mode) { private static String translateToRAFMode(FileChannel.MapMode mode) {
@ -91,13 +84,11 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice {
public MultimapFileLong(RandomAccessFile file, public MultimapFileLong(RandomAccessFile file,
FileChannel.MapMode mode, FileChannel.MapMode mode,
long mapSizeBytes, long mapSizeBytes,
int bufferSizeWords, int bufferSizeWords) throws IOException {
boolean loadAggressively) throws IOException {
this.mode = mode; this.mode = mode;
this.bufferSize = bufferSizeWords; this.bufferSize = bufferSizeWords;
this.mapSize = mapSizeBytes; this.mapSize = mapSizeBytes;
this.fileLength = file.length(); this.fileLength = file.length();
this.loadAggressively = loadAggressively;
channel = file.getChannel(); channel = file.getChannel();
mappedSize = 0; mappedSize = 0;
@ -115,6 +106,7 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice {
@SneakyThrows @SneakyThrows
public void advice(NativeIO.Advice advice) { public void advice(NativeIO.Advice advice) {
this.defaultAdvice = advice;
for (var buffer : mappedByteBuffers) { for (var buffer : mappedByteBuffers) {
NativeIO.madvise(buffer, advice); NativeIO.madvise(buffer, advice);
} }
@ -157,7 +149,7 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice {
} }
@SneakyThrows @SneakyThrows
private void grow(long posIdxRequired) { public void grow(long posIdxRequired) {
if (posIdxRequired*WORD_SIZE > mapSize && mode == READ_ONLY) { if (posIdxRequired*WORD_SIZE > mapSize && mode == READ_ONLY) {
throw new IndexOutOfBoundsException(posIdxRequired + " (max " + mapSize + ")"); throw new IndexOutOfBoundsException(posIdxRequired + " (max " + mapSize + ")");
} }
@ -182,11 +174,8 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice {
var buffer = channel.map(mode, posBytes, bzBytes); var buffer = channel.map(mode, posBytes, bzBytes);
if (loadAggressively) if (defaultAdvice != null) {
buffer.load(); NativeIO.madvise(buffer, defaultAdvice);
if (advice != null) {
NativeIO.madvise(buffer, advice);
} }
buffers.add(buffer.asLongBuffer()); buffers.add(buffer.asLongBuffer());
@ -262,6 +251,32 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice {
} }
@Override
public void read(LongBuffer vals, long idx) {
int n = vals.limit() - vals.position();
if (idx+n >= mappedSize) {
grow(idx+n);
}
int iN = (int)((idx + n) / bufferSize);
for (int i = 0; i < n; ) {
int i0 = (int)((idx + i) / bufferSize);
int bufferOffset = (int) ((idx+i) % bufferSize);
var buffer = buffers.get(i0);
final int l;
if (i0 < iN) l = bufferSize - bufferOffset;
else l = Math.min(n - i, bufferSize - bufferOffset);
vals.put(vals.position() + i, buffer, bufferOffset, l);
i+=l;
}
}
@Override @Override
public void write(long[] vals, long idx) { public void write(long[] vals, long idx) {
write(vals, vals.length, idx); write(vals, vals.length, idx);
@ -363,8 +378,10 @@ public class MultimapFileLong implements AutoCloseable, MultimapFileLongSlice {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
force(); force();
mappedByteBuffers.clear(); mappedByteBuffers.clear();
buffers.clear(); buffers.clear();
channel.close(); channel.close();
// I want to believe // I want to believe

View File

@ -38,6 +38,9 @@ public class MultimapFileLongOffsetSlice implements MultimapFileLongSlice {
map.read(vals, n, idx+off); map.read(vals, n, idx+off);
} }
@Override
public void read(LongBuffer vals, long idx) { map.read(vals, idx+off); }
@Override @Override
public void write(long[] vals, long idx) { public void write(long[] vals, long idx) {
map.write(vals, idx+off); map.write(vals, idx+off);

View File

@ -15,6 +15,8 @@ public interface MultimapFileLongSlice {
void read(long[] vals, int n, long idx); void read(long[] vals, int n, long idx);
void read(LongBuffer vals, long idx);
void write(long[] vals, long idx); void write(long[] vals, long idx);
void write(long[] vals, int n, long idx); void write(long[] vals, int n, long idx);

View File

@ -6,6 +6,7 @@ import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableSource; import io.reactivex.rxjava3.core.ObservableSource;
import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import marcono1234.gson.recordadapter.RecordTypeAdapterFactory;
import nu.marginalia.wmsa.client.exception.LocalException; import nu.marginalia.wmsa.client.exception.LocalException;
import nu.marginalia.wmsa.client.exception.NetworkException; import nu.marginalia.wmsa.client.exception.NetworkException;
import nu.marginalia.wmsa.client.exception.RemoteException; import nu.marginalia.wmsa.client.exception.RemoteException;
@ -30,9 +31,12 @@ import java.util.zip.GZIPOutputStream;
public abstract class AbstractClient implements AutoCloseable { public abstract class AbstractClient implements AutoCloseable {
public static final String CONTEXT_OUTBOUND_REQUEST = "outbound-request"; public static final String CONTEXT_OUTBOUND_REQUEST = "outbound-request";
private final Gson gson = new GsonBuilder().create();
private final Gson gson = new GsonBuilder()
.registerTypeAdapterFactory(RecordTypeAdapterFactory.builder().allowMissingComponentValues().create())
.create();
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final Marker httpMarker = MarkerFactory.getMarker("HTTP");
private final OkHttpClient client; private final OkHttpClient client;

View File

@ -94,8 +94,6 @@ public class SqlLoadProcessedDocument {
} catch (SQLException ex) { } catch (SQLException ex) {
logger.warn("SQL error inserting document", ex); logger.warn("SQL error inserting document", ex);
} }
} }
public void loadWithError(LoaderData data, List<LoadProcessedDocumentWithError> documents) { public void loadWithError(LoaderData data, List<LoadProcessedDocumentWithError> documents) {

View File

@ -105,7 +105,7 @@ public class CrawlJobExtractorPageRankMain {
try (var domainQuery = conn.prepareStatement(specificDomainSqlFromId); try (var domainQuery = conn.prepareStatement(specificDomainSqlFromId);
var urlQuery = conn.prepareStatement(urlsSql)) var urlQuery = conn.prepareStatement(urlsSql))
{ {
domainQuery.setInt(1, domainId.getId()); domainQuery.setInt(1, domainId.id());
ResultSet rsp = domainQuery.executeQuery(); ResultSet rsp = domainQuery.executeQuery();
domainName = rsp.next() ? rsp.getString(1) : ""; domainName = rsp.next() ? rsp.getString(1) : "";
@ -113,10 +113,10 @@ public class CrawlJobExtractorPageRankMain {
spec.id = createId(new EdgeDomain(domainName)); spec.id = createId(new EdgeDomain(domainName));
spec.urls = new ArrayList<>(1000); spec.urls = new ArrayList<>(1000);
spec.crawlDepth = getCrawlDepth(new DomainWithId(domainName, domainId.getId())); spec.crawlDepth = getCrawlDepth(new DomainWithId(domainName, domainId.id()));
urlQuery.setString(1, domainName.toString()); urlQuery.setString(1, domainName.toString());
urlQuery.setInt(2, domainId.getId()); urlQuery.setInt(2, domainId.id());
urlQuery.setFetchSize(1000); urlQuery.setFetchSize(1000);
rsp = urlQuery.executeQuery(); rsp = urlQuery.executeQuery();

View File

@ -17,13 +17,8 @@ import nu.marginalia.wmsa.edge.search.model.BrowseResult;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Types;
import java.util.*; import java.util.*;
import java.util.function.Function;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
public class EdgeDataStoreDaoImpl implements EdgeDataStoreDao { public class EdgeDataStoreDaoImpl implements EdgeDataStoreDao {
@ -71,7 +66,7 @@ public class EdgeDataStoreDaoImpl implements EdgeDataStoreDao {
private <T> String idList(List<EdgeId<T>> ids) { private <T> String idList(List<EdgeId<T>> ids) {
StringJoiner j = new StringJoiner(",", "(", ")"); StringJoiner j = new StringJoiner(",", "(", ")");
for (var id : ids) { for (var id : ids) {
j.add(Integer.toString(id.getId())); j.add(Integer.toString(id.id()));
} }
return j.toString(); return j.toString();
} }
@ -154,7 +149,7 @@ public class EdgeDataStoreDaoImpl implements EdgeDataStoreDao {
try (var connection = dataSource.getConnection()) { try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(q)) { try (var stmt = connection.prepareStatement(q)) {
stmt.setFetchSize(count); stmt.setFetchSize(count);
stmt.setInt(1, domainId.getId()); stmt.setInt(1, domainId.id());
stmt.setInt(2, count); stmt.setInt(2, count);
var rsp = stmt.executeQuery(); var rsp = stmt.executeQuery();
while (rsp.next()) { while (rsp.next()) {
@ -183,7 +178,7 @@ public class EdgeDataStoreDaoImpl implements EdgeDataStoreDao {
try (var stmt = connection.prepareStatement(q2)) { try (var stmt = connection.prepareStatement(q2)) {
stmt.setFetchSize(count/2); stmt.setFetchSize(count/2);
stmt.setInt(1, domainId.getId()); stmt.setInt(1, domainId.id());
stmt.setInt(2, count/2 - domains.size()); stmt.setInt(2, count/2 - domains.size());
var rsp = stmt.executeQuery(); var rsp = stmt.executeQuery();
while (rsp.next() && domains.size() < count/2) { while (rsp.next() && domains.size() < count/2) {
@ -214,7 +209,7 @@ public class EdgeDataStoreDaoImpl implements EdgeDataStoreDao {
LIMIT ?"""; LIMIT ?""";
try (var stmt = connection.prepareStatement(q3)) { try (var stmt = connection.prepareStatement(q3)) {
stmt.setFetchSize(count/2); stmt.setFetchSize(count/2);
stmt.setInt(1, domainId.getId()); stmt.setInt(1, domainId.id());
stmt.setInt(2, count/2 - domains.size()); stmt.setInt(2, count/2 - domains.size());
var rsp = stmt.executeQuery(); var rsp = stmt.executeQuery();
@ -275,7 +270,7 @@ public class EdgeDataStoreDaoImpl implements EdgeDataStoreDao {
try (var connection = dataSource.getConnection()) { try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT DOMAIN_NAME FROM EC_DOMAIN WHERE ID=?")) { try (var stmt = connection.prepareStatement("SELECT DOMAIN_NAME FROM EC_DOMAIN WHERE ID=?")) {
stmt.setInt(1, id.getId()); stmt.setInt(1, id.id());
var rsp = stmt.executeQuery(); var rsp = stmt.executeQuery();
if (rsp.next()) { if (rsp.next()) {
return new EdgeDomain(rsp.getString(1)); return new EdgeDomain(rsp.getString(1));

View File

@ -9,7 +9,7 @@ import nu.marginalia.wmsa.edge.model.EdgeId;
public interface EdgeDomainBlacklist { public interface EdgeDomainBlacklist {
boolean isBlacklisted(int domainId); boolean isBlacklisted(int domainId);
default boolean isBlacklisted(EdgeId<EdgeDomain> domainId) { default boolean isBlacklisted(EdgeId<EdgeDomain> domainId) {
return isBlacklisted(domainId.getId()); return isBlacklisted(domainId.id());
} }
default TIntHashSet getSpamDomains() { default TIntHashSet getSpamDomains() {
return new TIntHashSet(); return new TIntHashSet();

View File

@ -1,11 +1,9 @@
package nu.marginalia.wmsa.edge.index; package nu.marginalia.wmsa.edge.index;
import nu.marginalia.wmsa.edge.index.EdgeIndexControl;
import nu.marginalia.wmsa.edge.index.IndexServicesFactory;
import nu.marginalia.wmsa.edge.index.model.EdgeIndexSearchTerms; import nu.marginalia.wmsa.edge.index.model.EdgeIndexSearchTerms;
import nu.marginalia.wmsa.edge.index.model.IndexBlock; import nu.marginalia.wmsa.edge.index.model.IndexBlock;
import nu.marginalia.wmsa.edge.index.reader.SearchIndexReader; import nu.marginalia.wmsa.edge.index.reader.SearchIndexReader;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexWriter; import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalWriter;
import nu.marginalia.wmsa.edge.index.reader.query.IndexSearchBudget; import nu.marginalia.wmsa.edge.index.reader.query.IndexSearchBudget;
import nu.marginalia.wmsa.edge.index.reader.query.Query; import nu.marginalia.wmsa.edge.index.reader.query.Query;
import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.NotNull;
@ -31,7 +29,7 @@ public class EdgeIndexBucket {
@NotNull @NotNull
private final IndexServicesFactory servicesFactory; private final IndexServicesFactory servicesFactory;
private final EdgeIndexControl indexControl; private final EdgeIndexControl indexControl;
private final SearchIndexWriter writer; private final SearchIndexJournalWriter writer;
private final int id; private final int id;

View File

@ -23,7 +23,7 @@ public class EdgeIndexControl {
for (IndexBlock block : IndexBlock.values()) { for (IndexBlock block : IndexBlock.values()) {
try { try {
servicesFactory.getIndexConverter(id, block); servicesFactory.convertIndex(id, block);
System.runFinalization(); System.runFinalization();
System.gc(); System.gc();
@ -40,10 +40,6 @@ public class EdgeIndexControl {
System.gc(); System.gc();
} }
public long wordCount(int id) {
return servicesFactory.wordCount(id);
}
public void switchIndexFiles(int id) throws Exception { public void switchIndexFiles(int id) throws Exception {
servicesFactory.switchFilesJob(id).call(); servicesFactory.switchFilesJob(id).call();
} }

View File

@ -11,12 +11,17 @@ import gnu.trove.set.hash.TIntHashSet;
import io.prometheus.client.Counter; import io.prometheus.client.Counter;
import io.prometheus.client.Histogram; import io.prometheus.client.Histogram;
import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.schedulers.Schedulers;
import marcono1234.gson.recordadapter.RecordTypeAdapterFactory;
import nu.marginalia.util.ListChunker;
import nu.marginalia.wmsa.configuration.server.Initialization; import nu.marginalia.wmsa.configuration.server.Initialization;
import nu.marginalia.wmsa.configuration.server.MetricsServer; import nu.marginalia.wmsa.configuration.server.MetricsServer;
import nu.marginalia.wmsa.configuration.server.Service; import nu.marginalia.wmsa.configuration.server.Service;
import nu.marginalia.wmsa.edge.index.dictionary.DictionaryWriter;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalEntry;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalEntryHeader;
import nu.marginalia.wmsa.edge.index.model.*; import nu.marginalia.wmsa.edge.index.model.*;
import nu.marginalia.wmsa.edge.index.reader.SearchIndexes; import nu.marginalia.wmsa.edge.index.reader.SearchIndexes;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexWriterImpl; import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalWriterImpl;
import nu.marginalia.wmsa.edge.index.reader.query.IndexSearchBudget; import nu.marginalia.wmsa.edge.index.reader.query.IndexSearchBudget;
import nu.marginalia.util.dict.DictionaryHashMap; import nu.marginalia.util.dict.DictionaryHashMap;
import nu.marginalia.wmsa.edge.model.*; import nu.marginalia.wmsa.edge.model.*;
@ -48,8 +53,11 @@ public class EdgeIndexService extends Service {
@NotNull @NotNull
private final Initialization init; private final Initialization init;
private final SearchIndexes indexes; private final SearchIndexes indexes;
private final DictionaryWriter dictionaryWriter;
private final Gson gson = new GsonBuilder().create(); private final Gson gson = new GsonBuilder()
.registerTypeAdapterFactory(RecordTypeAdapterFactory.builder().allowMissingComponentValues().create())
.create();
private static final Histogram wmsa_edge_index_query_time private static final Histogram wmsa_edge_index_query_time
= Histogram.build().name("wmsa_edge_index_query_time").help("-").register(); = Histogram.build().name("wmsa_edge_index_query_time").help("-").register();
@ -66,12 +74,13 @@ public class EdgeIndexService extends Service {
@Named("service-port") Integer port, @Named("service-port") Integer port,
Initialization init, Initialization init,
MetricsServer metricsServer, MetricsServer metricsServer,
SearchIndexes indexes SearchIndexes indexes,
) { IndexServicesFactory servicesFactory) {
super(ip, port, init, metricsServer); super(ip, port, init, metricsServer);
this.init = init; this.init = init;
this.indexes = indexes; this.indexes = indexes;
this.dictionaryWriter = servicesFactory.getDictionaryWriter();
Spark.post("/words/", this::putWords); Spark.post("/words/", this::putWords);
Spark.post("/search/", this::search, gson::toJson); Spark.post("/search/", this::search, gson::toJson);
@ -173,29 +182,19 @@ public class EdgeIndexService extends Service {
public void putWords(EdgeId<EdgeDomain> domainId, EdgeId<EdgeUrl> urlId, public void putWords(EdgeId<EdgeDomain> domainId, EdgeId<EdgeUrl> urlId,
EdgePageWords words, int idx EdgePageWords words, int idx
) { ) {
SearchIndexWriterImpl indexWriter = indexes.getIndexWriter(idx); SearchIndexJournalWriterImpl indexWriter = indexes.getIndexWriter(idx);
if (!words.words.isEmpty()) { for (var chunk : ListChunker.chopList(words.words, SearchIndexJournalEntry.MAX_LENGTH)) {
if (words.size() < 1000) {
indexWriter.put(domainId, urlId, words.block, words.words); var entry = new SearchIndexJournalEntry(getWordIds(chunk));
} else { var header = new SearchIndexJournalEntryHeader(domainId, urlId, words.block);
chunks(words.words, 1000).forEach(chunk -> {
indexWriter.put(domainId, urlId, words.block, chunk); indexWriter.put(header, entry);
}); };
}
}
} }
private long[] getWordIds(List<String> words) {
private <T> List<List<T>> chunks(Collection<T> coll, int size) { return words.stream().filter(w -> w.length() < Byte.MAX_VALUE).mapToLong(dictionaryWriter::get).toArray();
List<List<T>> ret = new ArrayList<>();
List<T> data = List.copyOf(coll);
for (int i = 0; i < data.size(); i+=size) {
ret.add(data.subList(i, Math.min(data.size(), i+size)));
}
return ret;
} }
private Object search(Request request, Response response) { private Object search(Request request, Response response) {
@ -341,7 +340,7 @@ public class EdgeIndexService extends Service {
getQuery(i, budget, sq.block, lv -> localFilter.filterRawValue(i, lv), searchTerms) getQuery(i, budget, sq.block, lv -> localFilter.filterRawValue(i, lv), searchTerms)
.mapToObj(id -> new EdgeSearchResultItem(i, sq.termSize(), id)) .mapToObj(id -> new EdgeSearchResultItem(i, sq.termSize(), id))
.filter(ri -> !seenResults.contains(ri.url.getId()) && localFilter.test(i, domainCountFilter, ri)) .filter(ri -> !seenResults.contains(ri.url.id()) && localFilter.test(i, domainCountFilter, ri))
.limit(specs.limitTotal * 3L) .limit(specs.limitTotal * 3L)
.distinct() .distinct()
.limit(Math.min(specs.limitByBucket .limit(Math.min(specs.limitByBucket
@ -350,7 +349,7 @@ public class EdgeIndexService extends Service {
for (var result : resultsForBucket) { for (var result : resultsForBucket) {
seenResults.add(result.url.getId()); seenResults.add(result.url.id());
} }
for (var result : resultsForBucket) { for (var result : resultsForBucket) {
for (var searchTerm : sq.searchTermsInclude) { for (var searchTerm : sq.searchTermsInclude) {
@ -401,7 +400,7 @@ public class EdgeIndexService extends Service {
public boolean filterRawValue(int bucket, long value) { public boolean filterRawValue(int bucket, long value) {
var domain = new EdgeId<EdgeDomain>((int)(value >>> 32)); var domain = new EdgeId<EdgeDomain>((int)(value >>> 32));
if (domain.getId() == Integer.MAX_VALUE) { if (domain.id() == Integer.MAX_VALUE) {
return true; return true;
} }
@ -409,11 +408,11 @@ public class EdgeIndexService extends Service {
} }
long getKey(int bucket, EdgeId<EdgeDomain> id) { long getKey(int bucket, EdgeId<EdgeDomain> id) {
return ((long)bucket) << 32 | id.getId(); return ((long)bucket) << 32 | id.id();
} }
public boolean test(int bucket, EdgeSearchResultItem item) { public boolean test(int bucket, EdgeSearchResultItem item) {
if (item.domain.getId() == Integer.MAX_VALUE) { if (item.domain.id() == Integer.MAX_VALUE) {
return true; return true;
} }
@ -431,7 +430,7 @@ public class EdgeIndexService extends Service {
} }
public boolean test(int bucket, DomainResultCountFilter root, EdgeSearchResultItem item) { public boolean test(int bucket, DomainResultCountFilter root, EdgeSearchResultItem item) {
if (item.domain.getId() == Integer.MAX_VALUE) { if (item.domain.id() == Integer.MAX_VALUE) {
return true; return true;
} }
return root.getCount(bucket, item) + resultsByDomain.adjustOrPutValue(getKey(bucket, item.domain), 1, 1) <= limitByDomain; return root.getCount(bucket, item) + resultsByDomain.adjustOrPutValue(getKey(bucket, item.domain), 1, 1) <= limitByDomain;

View File

@ -8,7 +8,7 @@ import nu.marginalia.wmsa.edge.data.dao.task.EdgeDomainBlacklist;
import nu.marginalia.wmsa.edge.index.conversion.ConversionUnnecessaryException; import nu.marginalia.wmsa.edge.index.conversion.ConversionUnnecessaryException;
import nu.marginalia.wmsa.edge.index.conversion.SearchIndexConverter; import nu.marginalia.wmsa.edge.index.conversion.SearchIndexConverter;
import nu.marginalia.wmsa.edge.index.conversion.SearchIndexPreconverter; import nu.marginalia.wmsa.edge.index.conversion.SearchIndexPreconverter;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexWriterImpl; import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalWriterImpl;
import nu.marginalia.wmsa.edge.index.model.IndexBlock; import nu.marginalia.wmsa.edge.index.model.IndexBlock;
import nu.marginalia.wmsa.edge.index.dictionary.DictionaryReader; import nu.marginalia.wmsa.edge.index.dictionary.DictionaryReader;
import nu.marginalia.wmsa.edge.index.dictionary.DictionaryWriter; import nu.marginalia.wmsa.edge.index.dictionary.DictionaryWriter;
@ -44,7 +44,8 @@ public class IndexServicesFactory {
private final DoublePartitionedDataFile indexWriteUrlsFile; private final DoublePartitionedDataFile indexWriteUrlsFile;
private volatile static DictionaryWriter dictionaryWriter; private volatile static DictionaryWriter dictionaryWriter;
private final Long dictionaryHashMapSize; private final Long dictionaryHashMapSize;
private final SearchIndexPartitioner partitoner; private final SearchIndexPartitioner partitioner;
@Inject @Inject
public IndexServicesFactory( public IndexServicesFactory(
@Named("tmp-file-dir") Path tmpFileDir, @Named("tmp-file-dir") Path tmpFileDir,
@ -59,7 +60,7 @@ public class IndexServicesFactory {
@Named("edge-index-write-urls-file") String indexWriteUrlsFile, @Named("edge-index-write-urls-file") String indexWriteUrlsFile,
@Named("edge-dictionary-hash-map-size") Long dictionaryHashMapSize, @Named("edge-dictionary-hash-map-size") Long dictionaryHashMapSize,
EdgeDomainBlacklist domainBlacklist, EdgeDomainBlacklist domainBlacklist,
SearchIndexPartitioner partitoner SearchIndexPartitioner partitioner
) { ) {
this.tmpFileDir = tmpFileDir; this.tmpFileDir = tmpFileDir;
@ -73,11 +74,11 @@ public class IndexServicesFactory {
this.indexWriteWordsFile = new DoublePartitionedDataFile(partitionRootFast, indexWriteWordsFile); this.indexWriteWordsFile = new DoublePartitionedDataFile(partitionRootFast, indexWriteWordsFile);
this.indexWriteUrlsFile = new DoublePartitionedDataFile(partitionRootFast, indexWriteUrlsFile); this.indexWriteUrlsFile = new DoublePartitionedDataFile(partitionRootFast, indexWriteUrlsFile);
this.preconverterOutputFile = new PartitionedDataFile(partitionRootSlowTmp, "preconverted.dat"); this.preconverterOutputFile = new PartitionedDataFile(partitionRootSlowTmp, "preconverted.dat");
this.partitoner = partitoner; this.partitioner = partitioner;
} }
public SearchIndexWriterImpl getIndexWriter(int idx) { public SearchIndexJournalWriterImpl getIndexWriter(int idx) {
return new SearchIndexWriterImpl(getDictionaryWriter(), writerIndexFile.get(idx)); return new SearchIndexJournalWriterImpl(getDictionaryWriter(), writerIndexFile.get(idx));
} }
public DictionaryWriter getDictionaryWriter() { public DictionaryWriter getDictionaryWriter() {
@ -93,15 +94,17 @@ public class IndexServicesFactory {
} }
public SearchIndexConverter getIndexConverter(int id, IndexBlock block) throws ConversionUnnecessaryException, IOException { public void convertIndex(int id, IndexBlock block) throws ConversionUnnecessaryException, IOException {
return new SearchIndexConverter(block, id, tmpFileDir, var converter = new SearchIndexConverter(block, id, tmpFileDir,
preconverterOutputFile.get(id), preconverterOutputFile.get(id),
indexWriteWordsFile.get(id, block.id), indexWriteWordsFile.get(id, block.id),
indexWriteUrlsFile.get(id, block.id), indexWriteUrlsFile.get(id, block.id),
partitoner, partitioner,
domainBlacklist domainBlacklist
); );
converter.convert();
} }
@SneakyThrows @SneakyThrows
public SearchIndexPreconverter getIndexPreconverter() { public SearchIndexPreconverter getIndexPreconverter() {
File[] outputFiles = new File[DYNAMIC_BUCKET_LENGTH+1]; File[] outputFiles = new File[DYNAMIC_BUCKET_LENGTH+1];
@ -110,7 +113,7 @@ public class IndexServicesFactory {
} }
return new SearchIndexPreconverter(writerIndexFile.get(0), return new SearchIndexPreconverter(writerIndexFile.get(0),
outputFiles, outputFiles,
partitoner, partitioner,
domainBlacklist domainBlacklist
); );
} }
@ -119,10 +122,6 @@ public class IndexServicesFactory {
return preconverterOutputFile.get(i); return preconverterOutputFile.get(i);
} }
public long wordCount(int id) {
return SearchIndexConverter.wordCount(writerIndexFile.get(0));
}
@SneakyThrows @SneakyThrows
public SearchIndexReader getIndexReader(int id) { public SearchIndexReader getIndexReader(int id) {
EnumMap<IndexBlock, SearchIndex> indexMap = new EnumMap<>(IndexBlock.class); EnumMap<IndexBlock, SearchIndex> indexMap = new EnumMap<>(IndexBlock.class);

View File

@ -1,331 +1,222 @@
package nu.marginalia.wmsa.edge.index.conversion; package nu.marginalia.wmsa.edge.index.conversion;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import gnu.trove.set.hash.TIntHashSet;
import lombok.RequiredArgsConstructor;
import lombok.SneakyThrows;
import nu.marginalia.wmsa.edge.data.dao.task.EdgeDomainBlacklist; import nu.marginalia.wmsa.edge.data.dao.task.EdgeDomainBlacklist;
import nu.marginalia.wmsa.edge.index.conversion.words.WordIndexOffsetsTable; import nu.marginalia.wmsa.edge.index.conversion.words.WordIndexOffsetsTable;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexWriterImpl; import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalEntry;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalReader;
import nu.marginalia.wmsa.edge.index.model.IndexBlock; import nu.marginalia.wmsa.edge.index.model.IndexBlock;
import nu.marginalia.wmsa.edge.index.conversion.words.WordsTableWriter; import nu.marginalia.wmsa.edge.index.conversion.words.WordsTableWriter;
import nu.marginalia.util.btree.BTreeWriter; import nu.marginalia.util.btree.BTreeWriter;
import nu.marginalia.util.btree.model.BTreeContext; import nu.marginalia.util.btree.model.BTreeContext;
import nu.marginalia.util.multimap.MultimapFileLong; import nu.marginalia.util.multimap.MultimapFileLong;
import nu.marginalia.util.RandomWriteFunnel; import nu.marginalia.util.RandomWriteFunnel;
import nu.marginalia.util.multimap.MultimapSorter;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.*; import java.io.*;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.concurrent.locks.Lock;
import static nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalEntry.MAX_LENGTH;
public class SearchIndexConverter { public class SearchIndexConverter {
private static final long FILE_HEADER_SIZE = 12;
private static final int CHUNK_HEADER_SIZE = 16;
public static final BTreeContext urlsBTreeContext = new BTreeContext(5, 1, ~0, 8); public static final BTreeContext urlsBTreeContext = new BTreeContext(5, 1, ~0, 8);
private final long fileLength; private final long[] tmpWordsBuffer = new long[MAX_LENGTH];
private final long urlsFileSize;
private final Path tmpFileDir; private final Path tmpFileDir;
private final FileChannel urlsTmpFileChannel;
private final int wordCount;
private final MultimapFileLong urlsTmpFileMap;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final IndexBlock block; private final IndexBlock block;
private final int bucketId; private final int bucketId;
private final File inputFile;
private final File outputFileWords;
private final File outputFileUrls;
private final File urlsFile;
private final SearchIndexPartitioner partitioner; private final SearchIndexPartitioner partitioner;
private final TIntHashSet spamDomains; private final EdgeDomainBlacklist blacklist;
private final MultimapSorter urlTmpFileSorter;
private final static int internalSortLimit = private final static int internalSortLimit =
Boolean.getBoolean("small-ram") ? 1024*1024 : 1024*1024*256; Boolean.getBoolean("small-ram") ? 1024*1024 : 1024*1024*256;
@SneakyThrows
public static long wordCount(File inputFile) {
try (RandomAccessFile raf = new RandomAccessFile(inputFile, "r")) {
raf.readLong();
return raf.readInt();
}
}
@Inject
public SearchIndexConverter(IndexBlock block, public SearchIndexConverter(IndexBlock block,
int bucketId, @Named("tmp-file-dir") Path tmpFileDir, int bucketId,
@Named("edge-writer-page-index-file") File inputFile, Path tmpFileDir,
@Named("edge-index-write-words-file") File outputFileWords, File inputFile,
@Named("edge-index-write-urls-file") File outputFileUrls, File outputFileWords,
File outputFileUrls,
SearchIndexPartitioner partitioner, SearchIndexPartitioner partitioner,
EdgeDomainBlacklist blacklist) EdgeDomainBlacklist blacklist)
throws ConversionUnnecessaryException, IOException
{ {
this.block = block; this.block = block;
this.bucketId = bucketId; this.bucketId = bucketId;
this.tmpFileDir = tmpFileDir; this.tmpFileDir = tmpFileDir;
this.urlsFile = outputFileUrls; this.inputFile = inputFile;
this.outputFileWords = outputFileWords;
this.outputFileUrls = outputFileUrls;
this.partitioner = partitioner; this.partitioner = partitioner;
this.spamDomains = blacklist.getSpamDomains(); this.blacklist = blacklist;
}
logger.info("Converting {} ({}) {}", block.id, block, inputFile);
public void convert() throws IOException {
Files.deleteIfExists(outputFileWords.toPath()); Files.deleteIfExists(outputFileWords.toPath());
Files.deleteIfExists(outputFileUrls.toPath()); Files.deleteIfExists(outputFileUrls.toPath());
final RandomAccessFile raf = new RandomAccessFile(inputFile, "r"); SearchIndexJournalReader journalReader = new SearchIndexJournalReader(MultimapFileLong.forReading(inputFile.toPath()));
this.fileLength = raf.readLong(); if (journalReader.fileHeader.fileSize() <= SearchIndexJournalReader.FILE_HEADER_SIZE_BYTES) {
this.wordCount = raf.readInt(); return;
if (fileLength <= FILE_HEADER_SIZE) {
throw new ConversionUnnecessaryException();
} }
var inputChannel = raf.getChannel(); logger.info("Converting {} ({}) {} {}", block.id, block, inputFile, journalReader.fileHeader);
ByteBuffer buffer = ByteBuffer.allocateDirect(10_000); var lock = partitioner.getReadLock();
try {
lock.lock();
urlsFileSize = getUrlsSize(buffer, inputChannel); var tmpUrlsFile = Files.createTempFile(tmpFileDir, "urls-sorted", ".dat");
var tmpUrlsFile = Files.createTempFile(tmpFileDir, "urls-sorted", ".dat"); logger.info("Creating word index table {} for block {} ({})", outputFileWords, block.id, block);
var urlsTmpFileRaf = new RandomAccessFile(tmpUrlsFile.toFile(), "rw"); WordIndexOffsetsTable wordIndexTable = createWordIndexTable(journalReader, outputFileWords);
urlsTmpFileChannel = urlsTmpFileRaf.getChannel();
urlsTmpFileMap = new MultimapFileLong(urlsTmpFileRaf, FileChannel.MapMode.READ_WRITE, urlsFileSize, 8*1024*1024, false);
urlTmpFileSorter = urlsTmpFileMap.createSorter(tmpFileDir, internalSortLimit);
logger.info("Creating word index table {} for block {} ({})", outputFileWords, block.id, block); logger.info("Creating word urls table {} for block {} ({})", outputFileUrls, block.id, block);
WordIndexOffsetsTable wordIndexTable = createWordIndexTable(outputFileWords, inputChannel); createUrlTable(journalReader, tmpUrlsFile, wordIndexTable);
logger.info("Creating word urls table {} for block {} ({})", outputFileUrls, block.id, block); Files.delete(tmpUrlsFile);
createUrlTable(buffer, raf, wordIndexTable);
Files.delete(tmpUrlsFile);
raf.close();
urlsTmpFileChannel.close();
urlsTmpFileMap.force();
}
private boolean isUrlAllowed(long url) {
return !spamDomains.contains((int)(url >>> 32));
}
public long translateUrl(long url) {
int domainId = partitioner.translateId(bucketId, (int) (url >>> 32));
return ((long)domainId << 32) | (url & 0xFFFFFFFFL);
}
private long getUrlsSize(ByteBuffer buffer, FileChannel channel) throws IOException {
channel.position(FILE_HEADER_SIZE);
var reader = new IndexReader(buffer, channel) {
public long size;
@Override
public void eachWord(long urlId, int wordId) {
size++;
}
};
reader.read();
logger.info("Blacklist filtered {} URLs", reader.filtered);
logger.debug("URLs Size {} Mb", channel.position()/(1024*1024));
return reader.size;
}
private void createUrlTable(ByteBuffer buffer, RandomAccessFile raf, WordIndexOffsetsTable wordOffsetsTable) throws IOException {
logger.info("Table size = {}", wordOffsetsTable.length());
raf.seek(FILE_HEADER_SIZE);
var channel = raf.getChannel();
try (RandomWriteFunnel rwf = new RandomWriteFunnel(tmpFileDir, urlsFileSize, 10_000_000)) {
int[] wordWriteOffset = new int[wordOffsetsTable.length()];
new IndexReader(buffer, channel) {
@Override
public void eachWord(long urlId, int wordId) throws IOException {
if (wordId >= wordWriteOffset.length)
return;
if (wordId > 0) {
rwf.put(wordOffsetsTable.get(wordId - 1) + wordWriteOffset[wordId]++, translateUrl(urlId));
} else {
rwf.put(wordWriteOffset[wordId]++, translateUrl(urlId));
}
}
}.read();
rwf.write(urlsTmpFileChannel);
} }
finally {
urlsTmpFileChannel.force(false); lock.unlock();
logger.info("URL TMP Table: {} Mb", channel.position()/(1024*1024));
if (wordOffsetsTable.length() > 0) {
logger.info("Sorting urls table");
wordOffsetsTable.forEach(urlTmpFileSorter::sort);
urlsTmpFileMap.force();
}
else {
logger.warn("urls table empty -- nothing to sort");
}
logger.info("Writing BTree");
try (var urlsFileMap = MultimapFileLong.forOutput(urlsFile.toPath(), 1024)) {
var writer = new BTreeWriter(urlsFileMap, urlsBTreeContext);
wordOffsetsTable.fold((accumulatorIdx, start, length) -> {
// Note: The return value is accumulated into accumulatorIdx!
return writer.write(accumulatorIdx, length,
slice -> slice.transferFromFileChannel(urlsTmpFileChannel, 0, start, start + length));
});
} catch (Exception e) {
logger.error("Error while writing BTree", e);
} }
} }
private WordIndexOffsetsTable createWordIndexTable(File outputFileWords, FileChannel inputChannel) throws IOException {
inputChannel.position(FILE_HEADER_SIZE);
logger.debug("Table size = {}", wordCount);
WordsTableWriter wordsTableWriter = new WordsTableWriter(wordCount); private WordIndexOffsetsTable createWordIndexTable(SearchIndexJournalReader journalReader,
ByteBuffer buffer = ByteBuffer.allocateDirect(8* SearchIndexWriterImpl.MAX_BLOCK_SIZE); File outputFileWords) throws IOException
{
final int topWord = (int) journalReader.fileHeader.wordCount();
logger.debug("Table size = {}", topWord);
WordsTableWriter wordsTableWriter = new WordsTableWriter(topWord);
logger.debug("Reading words"); logger.debug("Reading words");
var reader = new IndexReader(buffer, inputChannel) { for (var entry : journalReader) {
@Override if (!isRelevantEntry(entry)) {
public void eachWord(long urlId, int wordId) { continue;
}
final SearchIndexJournalEntry entryData = entry.readEntryUsingBuffer(tmpWordsBuffer);
for (int i = 0; i < entryData.size(); i++) {
int wordId = (int) entryData.get(i);
if (wordId < 0 || wordId >= topWord) {
logger.warn("Bad wordId {}", wordId);
}
wordsTableWriter.acceptWord(wordId); wordsTableWriter.acceptWord(wordId);
} }
}; }
reader.read();
logger.debug("Rearranging table"); logger.debug("Rearranging table");
inputChannel.position(FILE_HEADER_SIZE);
wordsTableWriter.write(outputFileWords); wordsTableWriter.write(outputFileWords);
return wordsTableWriter.getTable(); return wordsTableWriter.getTable();
} }
@RequiredArgsConstructor private void createUrlTable(SearchIndexJournalReader journalReader,
private class IndexReader { Path tmpUrlsFile,
private final ByteBuffer buffer; WordIndexOffsetsTable wordOffsetsTable) throws IOException
private final FileChannel channel; {
public long filtered; logger.info("Table size = {}", wordOffsetsTable.length());
public void read() throws IOException { long numberOfWordsTotal = 0;
var lock = partitioner.getReadLock(); for (var entry : journalReader) {
try { if (isRelevantEntry(entry))
lock.lock(); numberOfWordsTotal += entry.wordCount();
outer: }
while (channel.position() < fileLength) {
buffer.clear();
buffer.limit(CHUNK_HEADER_SIZE);
channel.read(buffer);
buffer.flip();
long urlId = buffer.getLong();
int chunkBlock = buffer.getInt();
int count = buffer.getInt();
if (count > 1000) {
int tries = 0; try (RandomAccessFile urlsTmpFileRAF = new RandomAccessFile(tmpUrlsFile.toFile(), "rw");
logger.warn("Terminating garbage @{}b, attempting repair", channel.position()); FileChannel urlsTmpFileChannel = urlsTmpFileRAF.getChannel()) {
for (; ; ) { try (RandomWriteFunnel rwf = new RandomWriteFunnel(tmpFileDir, numberOfWordsTotal, 10_000_000)) {
tries++; int[] wordWriteOffset = new int[wordOffsetsTable.length()];
long p = channel.position();
buffer.clear();
buffer.limit(8);
if (channel.read(buffer) != 8) {
break outer; // EOF...?
}
buffer.flip(); for (var entry : journalReader) {
int pcb = buffer.getInt(); if (!isRelevantEntry(entry)) continue;
int pct = buffer.getInt();
if (pcb == 0 || pcb == 1 && pct >= 0 && pct <= 1000) { var entryData = entry.readEntryUsingBuffer(tmpWordsBuffer);
chunkBlock = pcb;
count = pct; for (int i = 0; i < entryData.size(); i++) {
break; int wordId = (int) entryData.get(i);
} else {
channel.position(p + 1); if (wordId >= wordWriteOffset.length)
} continue;
if (wordId < 0) {
logger.warn("Negative wordId {}", wordId);
} }
logger.warn("Skipped {}b", tries);
}
buffer.clear(); final long urlInternal = translateUrl(entry.docId());
buffer.limit(count * 4); if (wordId > 0) {
rwf.put(wordOffsetsTable.get(wordId - 1) + wordWriteOffset[wordId]++, urlInternal);
int trb = 0; } else {
while (trb < count * 4) { rwf.put(wordWriteOffset[wordId]++, urlInternal);
int rb = channel.read(buffer);
if (rb <= 0) {
throw new ArrayIndexOutOfBoundsException(trb + " - " + count * 4 + " " + rb);
} }
trb += rb;
}
buffer.flip();
if (isUrlAllowed(urlId)) {
if (block.id == chunkBlock) {
eachUrl(lock, count, urlId);
}
} else {
filtered++;
} }
} }
}
finally {
lock.unlock();
}
}
public void eachUrl(Lock lock, int count, long urlId) throws IOException {
for (int i = 0; i < count; i++) { rwf.write(urlsTmpFileChannel);
int wordId = buffer.getInt(); }
if (acceptWord(lock, urlId)) {
eachWord(urlId, wordId); urlsTmpFileChannel.force(false);
try (var urlsTmpFileMap = MultimapFileLong.forOutput(tmpUrlsFile, numberOfWordsTotal)) {
if (wordOffsetsTable.length() > 0) {
logger.info("Sorting urls table");
var urlTmpFileSorter = urlsTmpFileMap.createSorter(tmpFileDir, internalSortLimit);
wordOffsetsTable.forEachRange(urlTmpFileSorter::sort);
urlsTmpFileMap.force();
} else {
logger.warn("urls table empty -- nothing to sort");
} }
} }
}
public void eachWord(long urlId, int wordId) throws IOException {
} logger.info("Writing BTree");
try (var urlsFileMap = MultimapFileLong.forOutput(outputFileUrls.toPath(), numberOfWordsTotal)) {
var writer = new BTreeWriter(urlsFileMap, urlsBTreeContext);
boolean acceptWord(Lock lock, long urlId) { wordOffsetsTable.foldRanges((accumulatorIdx, start, length) -> {
int domainId = (int) (urlId >>> 32L); // Note: The return value is accumulated into accumulatorIdx!
if (!partitioner.filterUnsafe(lock, domainId, bucketId)) { return writer.write(accumulatorIdx, length,
return false; slice -> slice.transferFromFileChannel(urlsTmpFileChannel, 0, start, start + length));
});
} catch (Exception e) {
logger.error("Error while writing BTree", e);
} }
return true;
} }
} }
private long translateUrl(long url) {
int domainId = partitioner.translateId(bucketId, (int) (url >>> 32));
return ((long)domainId << 32) | (url & 0xFFFFFFFFL);
}
private boolean isRelevantEntry(SearchIndexJournalReader.JournalEntry entry) {
return block.equals(entry.header.block())
&& !blacklist.isBlacklisted(entry.domainId())
&& partitioner.filterUnsafe(entry.domainId(), bucketId);
}
} }

View File

@ -122,7 +122,7 @@ public class SearchIndexPartitioner {
public Lock getReadLock() { public Lock getReadLock() {
return rwl.readLock(); return rwl.readLock();
} }
public boolean filterUnsafe(Lock lock, int domainId, int bucketId) { public boolean filterUnsafe(int domainId, int bucketId) {
return partitionSet.test(domainId, bucketId); return partitionSet.test(domainId, bucketId);
} }

View File

@ -3,7 +3,9 @@ package nu.marginalia.wmsa.edge.index.conversion;
import com.google.inject.Inject; import com.google.inject.Inject;
import gnu.trove.set.hash.TIntHashSet; import gnu.trove.set.hash.TIntHashSet;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import nu.marginalia.util.multimap.MultimapFileLong;
import nu.marginalia.wmsa.edge.data.dao.task.EdgeDomainBlacklist; import nu.marginalia.wmsa.edge.data.dao.task.EdgeDomainBlacklist;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalReader;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -46,23 +48,16 @@ public class SearchIndexPreconverter {
} }
} }
final RandomAccessFile raf = new RandomAccessFile(inputFile, "r"); SearchIndexJournalReader indexJournalReader = new SearchIndexJournalReader(MultimapFileLong.forReading(inputFile.toPath()));
var fileLength = raf.readLong(); final long wordCountOriginal = indexJournalReader.fileHeader.wordCount();
var wordCount = raf.readInt();
final int wordCountOriginal = wordCount;
logger.info("Word Count: {}", wordCount); logger.info("{}", indexJournalReader.fileHeader);
logger.info("File Length: {}", fileLength);
var channel = raf.getChannel();
ByteBuffer inByteBuffer = ByteBuffer.allocateDirect(10_000);
RandomAccessFile[] randomAccessFiles = new RandomAccessFile[outputFiles.length]; RandomAccessFile[] randomAccessFiles = new RandomAccessFile[outputFiles.length];
for (int i = 0; i < randomAccessFiles.length; i++) { for (int i = 0; i < randomAccessFiles.length; i++) {
randomAccessFiles[i] = new RandomAccessFile(outputFiles[i], "rw"); randomAccessFiles[i] = new RandomAccessFile(outputFiles[i], "rw");
randomAccessFiles[i].seek(12); randomAccessFiles[i].seek(SearchIndexJournalReader.FILE_HEADER_SIZE_BYTES);
} }
FileChannel[] fileChannels = new FileChannel[outputFiles.length]; FileChannel[] fileChannels = new FileChannel[outputFiles.length];
for (int i = 0; i < fileChannels.length; i++) { for (int i = 0; i < fileChannels.length; i++) {
@ -73,33 +68,24 @@ public class SearchIndexPreconverter {
var lock = partitioner.getReadLock(); var lock = partitioner.getReadLock();
try { try {
lock.lock(); lock.lock();
ByteBuffer buffer = ByteBuffer.allocateDirect(8192);
while (channel.position() < fileLength) { for (var entry : indexJournalReader) {
inByteBuffer.clear(); if (!partitioner.isGoodUrl(entry.urlId())
inByteBuffer.limit(CHUNK_HEADER_SIZE); || spamDomains.contains(entry.domainId())) {
channel.read(inByteBuffer); continue;
inByteBuffer.flip();
long urlId = inByteBuffer.getLong();
int chunkBlock = inByteBuffer.getInt();
int count = inByteBuffer.getInt();
// inByteBuffer.clear();
inByteBuffer.limit(count * 4 + CHUNK_HEADER_SIZE);
channel.read(inByteBuffer);
inByteBuffer.position(CHUNK_HEADER_SIZE);
for (int i = 0; i < count; i++) {
wordCount = Math.max(wordCount, 1 + inByteBuffer.getInt());
} }
inByteBuffer.position(count * 4 + CHUNK_HEADER_SIZE); int domainId = entry.domainId();
buffer.clear();
entry.copyToBuffer(buffer);
for (int i = 0; i < randomAccessFiles.length; i++) {
if (partitioner.filterUnsafe(domainId, i)) {
buffer.flip();
if (isUrlAllowed(urlId)) { while (buffer.position() < buffer.limit())
for (int i = 0; i < randomAccessFiles.length; i++) { fileChannels[i].write(buffer);
if (partitioner.filterUnsafe(lock, (int) (urlId >>> 32L), i)) {
inByteBuffer.flip();
fileChannels[i].write(inByteBuffer);
}
} }
} }
} }
@ -108,27 +94,16 @@ public class SearchIndexPreconverter {
lock.unlock(); lock.unlock();
} }
if (wordCountOriginal < wordCount) {
logger.warn("Raised word count {} => {}", wordCountOriginal, wordCount);
}
for (int i = 0; i < randomAccessFiles.length; i++) { for (int i = 0; i < randomAccessFiles.length; i++) {
long pos = randomAccessFiles[i].getFilePointer(); long pos = randomAccessFiles[i].getFilePointer();
randomAccessFiles[i].seek(0); randomAccessFiles[i].seek(0);
randomAccessFiles[i].writeLong(pos); randomAccessFiles[i].writeLong(pos);
randomAccessFiles[i].writeInt(wordCount); randomAccessFiles[i].writeLong(wordCountOriginal);
fileChannels[i].force(true); fileChannels[i].force(true);
fileChannels[i].close(); fileChannels[i].close();
randomAccessFiles[i].close(); randomAccessFiles[i].close();
} }
} }
private boolean isUrlAllowed(long url) {
int urlId = (int)(url & 0xFFFF_FFFFL);
int domainId = (int)(url >>> 32);
return partitioner.isGoodUrl(urlId) && !spamDomains.contains(domainId);
}
} }

View File

@ -16,7 +16,7 @@ public class WordIndexOffsetsTable {
return table.length; return table.length;
} }
public void forEach(OffsetTableEntryConsumer o) throws IOException { public void forEachRange(OffsetTableEntryConsumer o) throws IOException {
if (table[0] > 0) { if (table[0] > 0) {
o.accept(0, (int) table[0]); o.accept(0, (int) table[0]);
} }
@ -32,9 +32,9 @@ public class WordIndexOffsetsTable {
} }
/** /**
* Fold over each span in the file, left to right * Fold over each span in the file, left to right, accumulating the return value
*/ */
public long fold(OffsetTableEntryFoldConsumer o) throws IOException { public long foldRanges(OffsetTableEntryFoldConsumer o) throws IOException {
long total = 0; long total = 0;
if (table[0] > 0) { if (table[0] > 0) {

View File

@ -0,0 +1,49 @@
package nu.marginalia.wmsa.edge.index.journal;
import java.nio.ByteBuffer;
import java.util.Arrays;
public class SearchIndexJournalEntry {
private final int size;
private final long[] underlyingArray;
public static final int MAX_LENGTH = 1000;
public SearchIndexJournalEntry(long[] underlyingArray) {
this.size = underlyingArray.length;
this.underlyingArray = underlyingArray;
}
public SearchIndexJournalEntry(int size, long[] underlyingArray) {
this.size = size;
this.underlyingArray = underlyingArray;
}
public void write(ByteBuffer buffer) {
for (int i = 0; i < size; i++) {
buffer.putLong(underlyingArray[i]);
}
}
public long get(int idx) {
if (idx >= size)
throw new ArrayIndexOutOfBoundsException();
return underlyingArray[idx];
}
public int size() {
return size;
}
public long[] toArray() {
if (size == underlyingArray.length)
return underlyingArray;
else
return Arrays.copyOf(underlyingArray, size);
}
public String toString() {
return String.format("%s[%s]", getClass().getSimpleName(), Arrays.toString(toArray()));
}
}

View File

@ -0,0 +1,16 @@
package nu.marginalia.wmsa.edge.index.journal;
import nu.marginalia.wmsa.edge.index.model.IndexBlock;
import nu.marginalia.wmsa.edge.model.EdgeDomain;
import nu.marginalia.wmsa.edge.model.EdgeId;
import nu.marginalia.wmsa.edge.model.EdgeUrl;
public record SearchIndexJournalEntryHeader(int entrySize, long documentId, IndexBlock block) {
public static final int HEADER_SIZE_LONGS = 2;
public SearchIndexJournalEntryHeader( EdgeId<EdgeDomain> domainId, EdgeId<EdgeUrl> urlId, IndexBlock block) {
this(-1, (long) domainId.id() << 32 | urlId.id(), block);
}
}

View File

@ -0,0 +1,4 @@
package nu.marginalia.wmsa.edge.index.journal;
public record SearchIndexJournalFileHeader(long fileSize, long wordCount) {
}

View File

@ -0,0 +1,123 @@
package nu.marginalia.wmsa.edge.index.journal;
import com.upserve.uppend.blobs.NativeIO;
import nu.marginalia.util.multimap.MultimapFileLong;
import nu.marginalia.util.multimap.MultimapFileLongSlice;
import nu.marginalia.wmsa.edge.index.model.IndexBlock;
import org.jetbrains.annotations.NotNull;
import java.nio.ByteBuffer;
import java.util.Iterator;
import static nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalEntryHeader.HEADER_SIZE_LONGS;
public class SearchIndexJournalReader implements Iterable<SearchIndexJournalReader.JournalEntry> {
public static final long FILE_HEADER_SIZE_LONGS = 2;
public static final long FILE_HEADER_SIZE_BYTES = 8*FILE_HEADER_SIZE_LONGS;
public final SearchIndexJournalFileHeader fileHeader;
private final MultimapFileLongSlice map;
private final long committedSize;
public SearchIndexJournalReader(MultimapFileLong map) {
fileHeader = new SearchIndexJournalFileHeader(map.get(0), map.get(1));
committedSize = map.get(0) / 8 - FILE_HEADER_SIZE_LONGS;
map.advice(NativeIO.Advice.Sequential);
this.map = map.atOffset(FILE_HEADER_SIZE_LONGS);
}
@NotNull
@Override
public Iterator<JournalEntry> iterator() {
return new JournalEntryIterator();
}
private class JournalEntryIterator implements Iterator<JournalEntry> {
private JournalEntry entry;
@Override
public boolean hasNext() {
if (entry == null) {
return committedSize > 0;
}
return entry.hasNext();
}
@Override
public JournalEntry next() {
if (entry == null) {
entry = new JournalEntry(0);
}
else {
entry = entry.next();
}
return entry;
}
}
public class JournalEntry {
private final long offset;
public final SearchIndexJournalEntryHeader header;
JournalEntry(long offset) {
final long sizeBlock = map.get(offset);
final long docId = map.get(offset + 1);
this.offset = offset;
this.header = new SearchIndexJournalEntryHeader(
(int)(sizeBlock >>> 32L),
docId,
IndexBlock.byId((int)(sizeBlock & 0xFFFF_FFFFL)));
}
public boolean hasNext() {
return nextId() < committedSize;
}
public long docId() {
return header.documentId();
}
public int domainId() {
return (int) (docId() >>> 32L);
}
public int urlId() {
return (int)(docId() & 0xFFFF_FFFFL);
}
public IndexBlock block() {
return header.block();
}
public int wordCount() { return header.entrySize(); }
public SearchIndexJournalEntry readEntry() {
long[] dest = new long[header.entrySize()];
map.read(dest, offset + HEADER_SIZE_LONGS);
return new SearchIndexJournalEntry(header.entrySize(), dest);
}
public SearchIndexJournalEntry readEntryUsingBuffer(long[] dest) {
if (dest.length >= header.entrySize()) {
map.read(dest, header.entrySize(), offset + HEADER_SIZE_LONGS);
return new SearchIndexJournalEntry(header.entrySize(), dest);
}
else {
return readEntry();
}
}
public long nextId() {
return offset + HEADER_SIZE_LONGS + header.entrySize();
}
public JournalEntry next() { return new JournalEntry(nextId()); }
public void copyToBuffer(ByteBuffer buffer) {
var dest = buffer.asLongBuffer();
dest.position(buffer.position() * 8);
dest.limit(buffer.position()*8 + header.entrySize() + HEADER_SIZE_LONGS);
map.read(dest, offset);
buffer.position(dest.limit()*8);
}
}
}

View File

@ -0,0 +1,10 @@
package nu.marginalia.wmsa.edge.index.journal;
public interface SearchIndexJournalWriter {
void put(SearchIndexJournalEntryHeader header, SearchIndexJournalEntry entry);
void forceWrite();
void flushWords();
}

View File

@ -3,11 +3,7 @@ package nu.marginalia.wmsa.edge.index.journal;
import io.reactivex.rxjava3.disposables.Disposable; import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import nu.marginalia.wmsa.edge.index.model.IndexBlock;
import nu.marginalia.wmsa.edge.index.dictionary.DictionaryWriter; import nu.marginalia.wmsa.edge.index.dictionary.DictionaryWriter;
import nu.marginalia.wmsa.edge.model.EdgeDomain;
import nu.marginalia.wmsa.edge.model.EdgeId;
import nu.marginalia.wmsa.edge.model.EdgeUrl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -17,10 +13,9 @@ import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class SearchIndexWriterImpl implements SearchIndexWriter { public class SearchIndexJournalWriterImpl implements SearchIndexJournalWriter {
private final DictionaryWriter dictionaryWriter; private final DictionaryWriter dictionaryWriter;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
@ -28,12 +23,12 @@ public class SearchIndexWriterImpl implements SearchIndexWriter {
private RandomAccessFile raf; private RandomAccessFile raf;
private FileChannel channel; private FileChannel channel;
public static final int MAX_BLOCK_SIZE = 1000*32*8*4; public static final int MAX_BLOCK_SIZE = SearchIndexJournalEntry.MAX_LENGTH*32*8*4;
private final ByteBuffer byteBuffer; private final ByteBuffer byteBuffer;
private long pos; private long pos;
@SneakyThrows @SneakyThrows
public SearchIndexWriterImpl(DictionaryWriter dictionaryWriter, File indexFile) { public SearchIndexJournalWriterImpl(DictionaryWriter dictionaryWriter, File indexFile) {
this.dictionaryWriter = dictionaryWriter; this.dictionaryWriter = dictionaryWriter;
initializeIndexFile(indexFile); initializeIndexFile(indexFile);
@ -61,23 +56,16 @@ public class SearchIndexWriterImpl implements SearchIndexWriter {
@Override @Override
@SneakyThrows @SneakyThrows
public synchronized void put(EdgeId<EdgeDomain> domainId, EdgeId<EdgeUrl> urlId, IndexBlock block, List<String> wordsSuspect) { public synchronized void put(SearchIndexJournalEntryHeader header, SearchIndexJournalEntry entryData) {
int numGoodWords = 0;
for (String word : wordsSuspect) {
if (word.length() < Byte.MAX_VALUE) numGoodWords++;
}
byteBuffer.clear(); byteBuffer.clear();
long url_id = ((long) domainId.getId() << 32) | urlId.getId();
byteBuffer.putLong(url_id);
byteBuffer.putInt(block.id);
byteBuffer.putInt(numGoodWords);
for (String word : wordsSuspect) { byteBuffer.putInt(entryData.size());
if (word.length() < Byte.MAX_VALUE) { byteBuffer.putInt(header.block().id);
byteBuffer.putInt(dictionaryWriter.get(word)); byteBuffer.putLong(header.documentId());
}
} entryData.write(byteBuffer);
byteBuffer.limit(byteBuffer.position()); byteBuffer.limit(byteBuffer.position());
byteBuffer.rewind(); byteBuffer.rewind();
@ -104,11 +92,11 @@ public class SearchIndexWriterImpl implements SearchIndexWriter {
} }
private void writePositionMarker() throws IOException { private void writePositionMarker() throws IOException {
var lock = channel.lock(0, 12, false); var lock = channel.lock(0, 16, false);
pos = channel.size(); pos = channel.size();
raf.seek(0); raf.seek(0);
raf.writeLong(pos); raf.writeLong(pos);
raf.writeInt(dictionaryWriter.size()); raf.writeLong(dictionaryWriter.size());
raf.seek(pos); raf.seek(pos);
lock.release(); lock.release();
} }

View File

@ -1,16 +0,0 @@
package nu.marginalia.wmsa.edge.index.journal;
import nu.marginalia.wmsa.edge.index.model.IndexBlock;
import nu.marginalia.wmsa.edge.model.EdgeDomain;
import nu.marginalia.wmsa.edge.model.EdgeId;
import nu.marginalia.wmsa.edge.model.EdgeUrl;
import java.util.List;
public interface SearchIndexWriter {
void put(EdgeId<EdgeDomain> domainId, EdgeId<EdgeUrl> urlId, IndexBlock block, List<String> words);
void forceWrite();
void flushWords();
}

View File

@ -7,7 +7,7 @@ import nu.marginalia.wmsa.edge.index.IndexServicesFactory;
import nu.marginalia.wmsa.edge.index.conversion.SearchIndexPartitioner; import nu.marginalia.wmsa.edge.index.conversion.SearchIndexPartitioner;
import nu.marginalia.wmsa.edge.index.EdgeIndexBucket; import nu.marginalia.wmsa.edge.index.EdgeIndexBucket;
import nu.marginalia.wmsa.edge.index.dictionary.DictionaryReader; import nu.marginalia.wmsa.edge.index.dictionary.DictionaryReader;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexWriterImpl; import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalWriterImpl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -27,8 +27,8 @@ public class SearchIndexes {
private final ReentrantLock opsLock = new ReentrantLock(false); private final ReentrantLock opsLock = new ReentrantLock(false);
private final SearchIndexWriterImpl primaryIndexWriter; private final SearchIndexJournalWriterImpl primaryIndexWriter;
private final SearchIndexWriterImpl secondaryIndexWriter; private final SearchIndexJournalWriterImpl secondaryIndexWriter;
private DictionaryReader dictionaryReader = null; private DictionaryReader dictionaryReader = null;
@Inject @Inject
@ -134,7 +134,7 @@ public class SearchIndexes {
} }
} }
public SearchIndexWriterImpl getIndexWriter(int idx) { public SearchIndexJournalWriterImpl getIndexWriter(int idx) {
if (idx == 0) { if (idx == 0) {
return primaryIndexWriter; return primaryIndexWriter;
} }

View File

@ -1,15 +1,10 @@
package nu.marginalia.wmsa.edge.model; package nu.marginalia.wmsa.edge.model;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
/** This exists entirely for strengthening the typing of IDs /**
* This exists entirely for strengthening the typing of IDs
* *
* @param <T> * @param <T>
*/ */
@AllArgsConstructor @Getter @EqualsAndHashCode @ToString public record EdgeId<T>(int id) {
public class EdgeId<T> {
private final int id;
} }

View File

@ -32,7 +32,7 @@ public class EdgeSearchResultItem {
} }
public long getCombinedId() { public long getCombinedId() {
return ((long) domain.getId() << 32L) | url.getId(); return ((long) domain.id() << 32L) | url.id();
} }
} }

View File

@ -121,7 +121,7 @@ public class EdgeSearchOperator {
int domainId = -1; int domainId = -1;
try { try {
if (domain != null) { if (domain != null) {
return edgeDataStoreDao.getDomainId(new EdgeDomain(domain)).getId(); return edgeDataStoreDao.getDomainId(new EdgeDomain(domain)).id();
} }
} }
catch (NoSuchElementException ex) { catch (NoSuchElementException ex) {

View File

@ -3,7 +3,6 @@ package nu.marginalia.wmsa.edge.search.command.commands;
import com.google.inject.Inject; import com.google.inject.Inject;
import nu.marginalia.wmsa.configuration.server.Context; import nu.marginalia.wmsa.configuration.server.Context;
import nu.marginalia.wmsa.edge.data.dao.EdgeDataStoreDao; import nu.marginalia.wmsa.edge.data.dao.EdgeDataStoreDao;
import nu.marginalia.wmsa.edge.data.dao.task.EdgeDomainBlacklist;
import nu.marginalia.wmsa.edge.index.model.IndexBlock; import nu.marginalia.wmsa.edge.index.model.IndexBlock;
import nu.marginalia.wmsa.edge.model.crawl.EdgeDomainIndexingState; import nu.marginalia.wmsa.edge.model.crawl.EdgeDomainIndexingState;
import nu.marginalia.wmsa.edge.search.EdgeSearchOperator; import nu.marginalia.wmsa.edge.search.EdgeSearchOperator;
@ -12,7 +11,6 @@ import nu.marginalia.wmsa.edge.search.command.ResponseType;
import nu.marginalia.wmsa.edge.search.command.SearchCommandInterface; import nu.marginalia.wmsa.edge.search.command.SearchCommandInterface;
import nu.marginalia.wmsa.edge.search.command.SearchParameters; import nu.marginalia.wmsa.edge.search.command.SearchParameters;
import nu.marginalia.wmsa.edge.search.model.DecoratedSearchResultSet; import nu.marginalia.wmsa.edge.search.model.DecoratedSearchResultSet;
import nu.marginalia.wmsa.edge.search.model.DecoratedSearchResults;
import nu.marginalia.wmsa.edge.search.model.DomainInformation; import nu.marginalia.wmsa.edge.search.model.DomainInformation;
import nu.marginalia.wmsa.edge.search.siteinfo.DomainInformationService; import nu.marginalia.wmsa.edge.search.siteinfo.DomainInformationService;
import nu.marginalia.wmsa.renderer.mustache.MustacheRenderer; import nu.marginalia.wmsa.renderer.mustache.MustacheRenderer;
@ -69,7 +67,7 @@ public class SiteSearchCommand implements SearchCommandInterface {
if (null != domain) { if (null != domain) {
resultSet = searchOperator.performDumbQuery(ctx, EdgeSearchProfile.CORPO, IndexBlock.Words, 100, 100, "site:"+domain); resultSet = searchOperator.performDumbQuery(ctx, EdgeSearchProfile.CORPO, IndexBlock.Words, 100, 100, "site:"+domain);
screenshotPath = Path.of("/screenshot/" + dataStoreDao.getDomainId(domain).getId()); screenshotPath = Path.of("/screenshot/" + dataStoreDao.getDomainId(domain).id());
} }
else { else {
resultSet = new DecoratedSearchResultSet(Collections.emptyList()); resultSet = new DecoratedSearchResultSet(Collections.emptyList());

View File

@ -78,8 +78,8 @@ public class SearchResultDecorator {
TIntArrayList missedIds = new TIntArrayList(); TIntArrayList missedIds = new TIntArrayList();
for (var resultItem : resultItems) { for (var resultItem : resultItems) {
var did = resultItem.getDomain().getId(); var did = resultItem.getDomain().id();
var uid = resultItem.getUrl().getId(); var uid = resultItem.getUrl().id();
var details = detailsById.get(uid); var details = detailsById.get(uid);
if (details == null) { if (details == null) {

View File

@ -2,7 +2,6 @@ package nu.marginalia.wmsa.edge.search.siteinfo;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import nu.marginalia.wmsa.edge.data.dao.EdgeDataStoreDao;
import nu.marginalia.wmsa.edge.data.dao.EdgeDataStoreDaoImpl; import nu.marginalia.wmsa.edge.data.dao.EdgeDataStoreDaoImpl;
import nu.marginalia.wmsa.edge.model.EdgeDomain; import nu.marginalia.wmsa.edge.model.EdgeDomain;
import nu.marginalia.wmsa.edge.model.EdgeId; import nu.marginalia.wmsa.edge.model.EdgeId;
@ -98,7 +97,7 @@ public class DomainInformationService {
try (var connection = dataSource.getConnection()) { try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT KNOWN_URLS FROM DOMAIN_METADATA WHERE ID=?")) { try (var stmt = connection.prepareStatement("SELECT KNOWN_URLS FROM DOMAIN_METADATA WHERE ID=?")) {
stmt.setInt(1, domainId.getId()); stmt.setInt(1, domainId.id());
var rsp = stmt.executeQuery(); var rsp = stmt.executeQuery();
if (rsp.next()) { if (rsp.next()) {
return rsp.getInt(1); return rsp.getInt(1);
@ -115,7 +114,7 @@ public class DomainInformationService {
try (var connection = dataSource.getConnection()) { try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT VISITED_URLS FROM DOMAIN_METADATA WHERE ID=?")) { try (var stmt = connection.prepareStatement("SELECT VISITED_URLS FROM DOMAIN_METADATA WHERE ID=?")) {
stmt.setInt(1, domainId.getId()); stmt.setInt(1, domainId.id());
var rsp = stmt.executeQuery(); var rsp = stmt.executeQuery();
if (rsp.next()) { if (rsp.next()) {
return rsp.getInt(1); return rsp.getInt(1);
@ -133,7 +132,7 @@ public class DomainInformationService {
try (var connection = dataSource.getConnection()) { try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT GOOD_URLS FROM DOMAIN_METADATA WHERE ID=?")) { try (var stmt = connection.prepareStatement("SELECT GOOD_URLS FROM DOMAIN_METADATA WHERE ID=?")) {
stmt.setInt(1, domainId.getId()); stmt.setInt(1, domainId.id());
var rsp = stmt.executeQuery(); var rsp = stmt.executeQuery();
if (rsp.next()) { if (rsp.next()) {
return rsp.getInt(1); return rsp.getInt(1);
@ -150,7 +149,7 @@ public class DomainInformationService {
try (var connection = dataSource.getConnection()) { try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT COUNT(ID) FROM EC_DOMAIN_LINK WHERE DEST_DOMAIN_ID=?")) { try (var stmt = connection.prepareStatement("SELECT COUNT(ID) FROM EC_DOMAIN_LINK WHERE DEST_DOMAIN_ID=?")) {
stmt.setInt(1, domainId.getId()); stmt.setInt(1, domainId.id());
var rsp = stmt.executeQuery(); var rsp = stmt.executeQuery();
if (rsp.next()) { if (rsp.next()) {
return rsp.getInt(1); return rsp.getInt(1);
@ -166,7 +165,7 @@ public class DomainInformationService {
try (var connection = dataSource.getConnection()) { try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT COUNT(ID) FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=?")) { try (var stmt = connection.prepareStatement("SELECT COUNT(ID) FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=?")) {
stmt.setInt(1, domainId.getId()); stmt.setInt(1, domainId.id());
var rsp = stmt.executeQuery(); var rsp = stmt.executeQuery();
if (rsp.next()) { if (rsp.next()) {
return rsp.getInt(1); return rsp.getInt(1);
@ -183,7 +182,7 @@ public class DomainInformationService {
try (var connection = dataSource.getConnection()) { try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT QUALITY FROM EC_DOMAIN WHERE ID=?")) { try (var stmt = connection.prepareStatement("SELECT QUALITY FROM EC_DOMAIN WHERE ID=?")) {
stmt.setInt(1, domainId.getId()); stmt.setInt(1, domainId.id());
var rsp = stmt.executeQuery(); var rsp = stmt.executeQuery();
if (rsp.next()) { if (rsp.next()) {
return rsp.getDouble(1); return rsp.getDouble(1);
@ -199,7 +198,7 @@ public class DomainInformationService {
try (var connection = dataSource.getConnection()) { try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT STATE FROM EC_DOMAIN WHERE ID=?")) { try (var stmt = connection.prepareStatement("SELECT STATE FROM EC_DOMAIN WHERE ID=?")) {
stmt.setInt(1, domainId.getId()); stmt.setInt(1, domainId.id());
var rsp = stmt.executeQuery(); var rsp = stmt.executeQuery();
if (rsp.next()) { if (rsp.next()) {
return EdgeDomainIndexingState.valueOf(rsp.getString(1)); return EdgeDomainIndexingState.valueOf(rsp.getString(1));
@ -216,8 +215,8 @@ public class DomainInformationService {
public List<EdgeDomain> getLinkingDomains(EdgeId<EdgeDomain> domainId) { public List<EdgeDomain> getLinkingDomains(EdgeId<EdgeDomain> domainId) {
try (var connection = dataSource.getConnection()) { try (var connection = dataSource.getConnection()) {
List<EdgeDomain> results = new ArrayList<>(25); List<EdgeDomain> results = new ArrayList<>(25);
try (var stmt = connection.prepareStatement("SELECT SOURCE_URL FROM EC_RELATED_LINKS_VIEW WHERE DEST_DOMAIN_ID=? ORDER BY SOURCE_DOMAIN_ID LIMIT 25")) { try (var stmt = connection.prepareStatement("SELECT SOURCE_DOMAIN FROM EC_RELATED_LINKS_VIEW WHERE DEST_DOMAIN_ID=? ORDER BY SOURCE_DOMAIN_ID LIMIT 25")) {
stmt.setInt(1, domainId.getId()); stmt.setInt(1, domainId.id());
var rsp = stmt.executeQuery(); var rsp = stmt.executeQuery();
while (rsp.next()) { while (rsp.next()) {
results.add(new EdgeDomain(rsp.getString(1))); results.add(new EdgeDomain(rsp.getString(1)));
@ -237,7 +236,7 @@ public class DomainInformationService {
try (var connection = dataSource.getConnection()) { try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT IFNULL(RANK, 1) FROM EC_DOMAIN WHERE ID=?")) { try (var stmt = connection.prepareStatement("SELECT IFNULL(RANK, 1) FROM EC_DOMAIN WHERE ID=?")) {
stmt.setInt(1, domainId.getId()); stmt.setInt(1, domainId.id());
var rsp = stmt.executeQuery(); var rsp = stmt.executeQuery();
if (rsp.next()) { if (rsp.next()) {
return rsp.getDouble(1); return rsp.getDouble(1);

View File

@ -81,7 +81,8 @@ public class EdgeIndexClientTest {
service = new EdgeIndexService("127.0.0.1", service = new EdgeIndexService("127.0.0.1",
testPort, testPort,
init, null, init, null,
indexes); indexes,
servicesFactory);
Spark.awaitInitialization(); Spark.awaitInitialization();
init.setReady(); init.setReady();
@ -113,7 +114,7 @@ public class EdgeIndexClientTest {
indexes.reindexAll(); indexes.reindexAll();
var rsp = client.query(Context.internal(), EdgeSearchSpecification.justIncludes("trapphus")); var rsp = client.query(Context.internal(), EdgeSearchSpecification.justIncludes("trapphus"));
System.out.println(rsp); System.out.println(rsp);
assertEquals(5, rsp.resultsList.get(IndexBlock.Title).get(0).results.get(0).get(0).url.getId()); assertEquals(5, rsp.resultsList.get(IndexBlock.Title).get(0).results.get(0).get(0).url.id());
} }

View File

@ -0,0 +1,76 @@
package nu.marginalia.wmsa.edge.index.service;
import lombok.SneakyThrows;
import nu.marginalia.util.multimap.MultimapFileLong;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalEntry;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalEntryHeader;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalReader;
import nu.marginalia.wmsa.edge.index.model.IndexBlock;
import nu.marginalia.wmsa.edge.index.dictionary.DictionaryWriter;
import nu.marginalia.wmsa.edge.index.reader.SearchIndexReader;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexJournalWriterImpl;
import nu.marginalia.wmsa.edge.index.reader.query.IndexSearchBudget;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
class SearchIndexJournalWriterTest {
DictionaryWriter dictionaryWriter;
SearchIndexJournalWriterImpl writer;
Path indexFile;
Path wordsFile1;
Path urlsFile1;
Path dictionaryFile;
private final Logger logger = LoggerFactory.getLogger(getClass());
@BeforeEach @SneakyThrows
void setUp() {
dictionaryFile = Files.createTempFile("tmp", ".dict");
dictionaryFile.toFile().deleteOnExit();
dictionaryWriter = new DictionaryWriter(dictionaryFile.toFile(), 1L<<16, false);
indexFile = Files.createTempFile("tmp", ".idx");
indexFile.toFile().deleteOnExit();
writer = new SearchIndexJournalWriterImpl(dictionaryWriter, indexFile.toFile());
wordsFile1 = Files.createTempFile("words1", ".idx");
urlsFile1 = Files.createTempFile("urls1", ".idx");
}
@SneakyThrows
@AfterEach
void tearDown() {
dictionaryWriter.close();
writer.close();
indexFile.toFile().delete();
dictionaryFile.toFile().delete();
urlsFile1.toFile().delete();
wordsFile1.toFile().delete();
}
@Test
void put() throws IOException {
writer.put(new SearchIndexJournalEntryHeader(4, (1234L << 32) | 5678, IndexBlock.Link),
new SearchIndexJournalEntry(new long[] { 1, 2, 3, 4 }));
writer.put(new SearchIndexJournalEntryHeader(4, (2345L << 32) | 2244, IndexBlock.Words),
new SearchIndexJournalEntry(new long[] { 5, 6, 7 }));
writer.forceWrite();
var reader = new SearchIndexJournalReader(MultimapFileLong.forReading(indexFile));
reader.forEach(entry -> {
logger.info("{}, {} {}", entry, entry.urlId(), entry.domainId());
logger.info("{}", entry.readEntry().toArray());
});
}
}

View File

@ -1,90 +0,0 @@
package nu.marginalia.wmsa.edge.index.service;
import lombok.SneakyThrows;
import nu.marginalia.wmsa.edge.index.conversion.SearchIndexPartitioner;
import nu.marginalia.wmsa.edge.index.model.IndexBlock;
import nu.marginalia.wmsa.edge.index.dictionary.DictionaryWriter;
import nu.marginalia.wmsa.edge.index.reader.SearchIndex;
import nu.marginalia.wmsa.edge.index.conversion.SearchIndexConverter;
import nu.marginalia.wmsa.edge.index.reader.SearchIndexReader;
import nu.marginalia.wmsa.edge.index.journal.SearchIndexWriterImpl;
import nu.marginalia.wmsa.edge.index.reader.query.IndexSearchBudget;
import nu.marginalia.wmsa.edge.model.EdgeId;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.EnumMap;
import static nu.marginalia.util.dict.DictionaryHashMap.NO_VALUE;
import static org.junit.jupiter.api.Assertions.*;
class SearchIndexWriterTest {
DictionaryWriter dictionaryWriter;
SearchIndexWriterImpl writer;
Path indexFile;
Path wordsFile1;
Path urlsFile1;
Path dictionaryFile;
@BeforeEach @SneakyThrows
void setUp() {
dictionaryFile = Files.createTempFile("tmp", ".dict");
dictionaryFile.toFile().deleteOnExit();
dictionaryWriter = new DictionaryWriter(dictionaryFile.toFile(), 1L<<16, false);
indexFile = Files.createTempFile("tmp", ".idx");
indexFile.toFile().deleteOnExit();
writer = new SearchIndexWriterImpl(dictionaryWriter, indexFile.toFile());
wordsFile1 = Files.createTempFile("words1", ".idx");
urlsFile1 = Files.createTempFile("urls1", ".idx");
}
@SneakyThrows
@AfterEach
void tearDown() {
dictionaryWriter.close();
writer.close();
indexFile.toFile().delete();
dictionaryFile.toFile().delete();
urlsFile1.toFile().delete();
wordsFile1.toFile().delete();
}
public long[] findWord(SearchIndexReader reader, String word, IndexBlock block) {
IndexSearchBudget budget = new IndexSearchBudget(100);
return reader.findWord(block, budget, lv->true, dictionaryWriter.getReadOnly(word)).stream().toArray();
}
@Test @SneakyThrows
void put() throws IOException {
writer.put(new EdgeId<>(0), new EdgeId<>(1), IndexBlock.Words, Arrays.asList("Hello", "Salvete", "everyone!", "This", "is", "Bob"));
writer.put(new EdgeId<>(0), new EdgeId<>(2), IndexBlock.Words, Arrays.asList("Salvete", "omnes!", "Bob", "sum", "Hello"));
writer.forceWrite();
new SearchIndexConverter(IndexBlock.Words, 0, Path.of("/tmp"), indexFile.toFile(), wordsFile1.toFile(), urlsFile1.toFile(), new SearchIndexPartitioner(null), val -> false);
EnumMap<IndexBlock, SearchIndex> indices = new EnumMap<IndexBlock, SearchIndex>(IndexBlock.class);
indices.put(IndexBlock.Words, new SearchIndex("0", urlsFile1.toFile(), wordsFile1.toFile()));
var reader = new SearchIndexReader(indices);
int bobId = dictionaryWriter.getReadOnly("Bob");
assertNotEquals(NO_VALUE, bobId);
assertEquals(2, reader.numHits(IndexBlock.Words, bobId));
assertArrayEquals(new long[] { 1, 2 }, findWord(reader,"Bob", IndexBlock.Words));
assertArrayEquals(new long[] { 2 }, findWord(reader,"sum", IndexBlock.Words));
assertArrayEquals(new long[] { }, findWord(reader,"New Word", IndexBlock.Words));
writer.close();
}
}

View File

@ -67,4 +67,37 @@ class RandomWriteFunnelTest {
} }
} }
} }
@Test
public void testYuge() {
new File("/tmp/test.bin").delete();
for (int j = 1; j <= 20; j++) {
try (var funnel = new RandomWriteFunnel(Path.of("/tmp"), 10, j);
var out = new RandomAccessFile("/tmp/test.bin", "rw")) {
for (int i = 10 - 1; i >= 0; i -= 2) {
funnel.put(i, Long.MAX_VALUE - i);
}
funnel.write(out.getChannel());
} catch (Exception e) {
e.printStackTrace();
}
try (var in = new RandomAccessFile("/tmp/test.bin", "r")) {
in.readLong();
in.readLong();
in.readLong();
in.readLong();
in.readLong();
in.readLong();
in.readLong();
in.readLong();
in.readLong();
in.readLong();
} catch (IOException e) {
e.printStackTrace();
}
}
}
} }

View File

@ -3,18 +3,15 @@ package com.upserve.uppend.blobs;
import jnr.ffi.*; import jnr.ffi.*;
import jnr.ffi.types.size_t; import jnr.ffi.types.size_t;
import org.slf4j.Logger;
import com.kenai.jffi.MemoryIO; import com.kenai.jffi.MemoryIO;
import java.io.IOException; import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.nio.*; import java.nio.*;
// https://github.com/upserve/uppend/blob/70967c6f24d7f1a3bbc18799f485d981da93f53b/src/main/java/com/upserve/uppend/blobs/NativeIO.java // https://github.com/upserve/uppend/blob/70967c6f24d7f1a3bbc18799f485d981da93f53b/src/main/java/com/upserve/uppend/blobs/NativeIO.java
// MIT License // MIT License
public class NativeIO { public class NativeIO {
private static final Logger log = org.slf4j.LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final NativeC nativeC = LibraryLoader.create(NativeC.class).load("c"); private static final NativeC nativeC = LibraryLoader.create(NativeC.class).load("c");
public static final int pageSize = nativeC.getpagesize(); // 4096 on most Linux public static final int pageSize = nativeC.getpagesize(); // 4096 on most Linux