(index-construction) Split repartition into two actions

This change splits the previous 'repartition' action into two steps, one for recalculating the domain rankings, and one for recalculating the other ranking sets.  Since only the first is necessary before the index construction, the rest can be delayed until after...

To avoid issues in handling the shotgun blast of MqNotifications, Service was switched over to use a synchronous message queue instead of an asynchronous one.

The change also modifies the behavior so that only node 1 will push the changes to the EC_DOMAIN database table, to avoid unnecessary db locks and contention with the loader.

Additionally, the change fixes a bug where the index construction code wasn't actually picking up the rankings data.

Since the index construction used to be performed by the index-service, merely saving the data to memory was enough for it to be accessible within the index-construction logic, but since it's been broken out into a separate process, the new process just injected an empty DomainRankings object instead.

To fix this, DomainRankings can now be persisted to disk, and a pre-loaded version of the object is injected into the index-construction process.
This commit is contained in:
Viktor Lofgren 2024-02-06 17:20:07 +01:00
parent 29ddf9e61d
commit 467ba5be20
11 changed files with 134 additions and 28 deletions

View File

@ -85,4 +85,12 @@ public class IndexClient extends AbstractDynamicClient {
null
);
}
public long triggerRerank(int node) throws Exception {
return messageQueueFactory.sendSingleShotRequest(
ServiceId.Index.withNode(node),
IndexMqEndpoints.INDEX_RERANK,
null
);
}
}

View File

@ -2,6 +2,7 @@ package nu.marginalia.index.client;
public class IndexMqEndpoints {
public static final String INDEX_IS_BLOCKED = "INDEX-IS-BLOCKED";
public static final String INDEX_RERANK = "INDEX-RERANK";
public static final String INDEX_REPARTITION = "INDEX-REPARTITION";
public static final String SWITCH_INDEX = "SWITCH-INDEX";

View File

@ -53,7 +53,7 @@ public class Service {
logger.info("Inbox name: {}", inboxName);
var mqInboxFactory = params.messageQueueInboxFactory;
messageQueueInbox = mqInboxFactory.createAsynchronousInbox(inboxName, config.node(), config.instanceUuid());
messageQueueInbox = mqInboxFactory.createSynchronousInbox(inboxName, config.node(), config.instanceUuid());
messageQueueInbox.subscribe(new ServiceMqSubscription(this));
serviceName = System.getProperty("service-name");

View File

@ -6,11 +6,19 @@ import nu.marginalia.model.id.UrlIdCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import static java.lang.Math.max;
import static java.lang.Math.min;
public class DomainRankings {
private final Int2ShortOpenHashMap rankings;
private static final Logger logger = LoggerFactory.getLogger(DomainRankings.class);
private final int MAX_MEANINGFUL_RANK = 50_000;
private final int MAX_RANK_VALUE = 255;
@ -25,6 +33,51 @@ public class DomainRankings {
values.forEach(this::putRanking);
}
private static final String name = "_rankings.dat";
public void save(Path basePath) {
Path fileName = basePath.resolve(name);
logger.info("Saving domain rankings to {}", fileName);
try (DataOutputStream dos = new DataOutputStream(Files.newOutputStream(fileName,
StandardOpenOption.WRITE,
StandardOpenOption.CREATE,
StandardOpenOption.TRUNCATE_EXISTING)))
{
rankings.forEach((domainId, rank) -> {
try {
dos.writeInt(domainId);
dos.writeShort(rank);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public void load(Path basePath) {
Path fileName = basePath.resolve(name);
logger.info("Loading domain rankings from {}", fileName);
try (DataInputStream dis = new DataInputStream(Files.newInputStream(fileName))) {
rankings.clear();
for (;;) {
int domainId = dis.readInt();
short rank = dis.readShort();
rankings.put(domainId, rank);
}
}
catch (EOFException e) {
// ok
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private void putRanking(int domainId, int value) {
rankings.put(domainId, scaleRank(value));
}

View File

@ -97,7 +97,7 @@ public class IndexConstructorMain extends ProcessMainClass {
heartbeat.start();
switch (instructions.name) {
case FORWARD -> createForwardIndex();
case FORWARD -> createForwardIndex();
case REVERSE_FULL -> createFullReverseIndex();
case REVERSE_PRIO -> createPrioReverseIndex();
}

View File

@ -1,12 +1,23 @@
package nu.marginalia.index;
import com.google.inject.AbstractModule;
import nu.marginalia.ProcessConfiguration;
import java.util.UUID;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import nu.marginalia.IndexLocations;
import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.storage.FileStorageService;
public class IndexConstructorModule extends AbstractModule {
@Override
public void configure() {
}
@Provides @Singleton
public DomainRankings getDomainRankings(FileStorageService fileStorageService) {
var rankings = new DomainRankings();
rankings.load(IndexLocations.getSearchSetsPath(fileStorageService));
return rankings;
}
}

View File

@ -38,7 +38,7 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
// STATES
public static final String REPARTITION = "REPARTITION";
public static final String RERANK = "RERANK";
private final ActorProcessWatcher processWatcher;
private final MqOutbox mqConverterOutbox;
private final MqOutbox mqLoaderOutbox;
@ -46,7 +46,6 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
private final MqOutbox indexOutbox;
private final FileStorageService storageService;
private final BackupService backupService;
private final Gson gson;
private final NodeConfigurationService nodeConfigurationService;
private final int nodeId;
@ -74,14 +73,14 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Backup(List<FileStorageId> processedIds) implements ActorStep { }
@Resume(behavior = ActorResumeBehavior.RETRY)
public record Repartition(long id) implements ActorStep { public Repartition() { this(-1); } }
public record Rerank(long id) implements ActorStep { public Rerank() { this(-1); } }
@Resume(behavior = ActorResumeBehavior.RETRY)
public record ReindexFwd(long id) implements ActorStep { public ReindexFwd() { this(-1); } }
@Resume(behavior = ActorResumeBehavior.RETRY)
public record ReindexFull(long id) implements ActorStep { public ReindexFull() { this(-1); } }
@Resume(behavior = ActorResumeBehavior.RETRY)
public record ReindexPrio(long id) implements ActorStep { public ReindexPrio() { this(-1); } }
public record SwitchOver() implements ActorStep {}
public record SwitchIndex() implements ActorStep {}
@Override
public ActorStep transition(ActorStep self) throws Exception {
@ -129,11 +128,11 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
}
case Backup(List<FileStorageId> processedIds) -> {
backupService.createBackupFromStaging(processedIds);
yield new Repartition();
yield new Rerank();
}
case Repartition(long id) when id < 0 ->
new Repartition(indexOutbox.sendAsync(IndexMqEndpoints.INDEX_REPARTITION, ""));
case Repartition(long id) -> {
case Rerank(long id) when id < 0 ->
new Rerank(indexOutbox.sendAsync(IndexMqEndpoints.INDEX_RERANK, ""));
case Rerank(long id) -> {
var rsp = indexOutbox.waitResponse(id);
if (rsp.state() != MqMessageState.OK) {
yield new Error("Repartition failed");
@ -166,12 +165,15 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
if (rsp.state() != MqMessageState.OK)
yield new Error("Repartition failed");
else
yield new SwitchOver();
yield new SwitchIndex();
}
case SwitchOver() -> {
indexOutbox.sendNotice(IndexMqEndpoints.SWITCH_INDEX, ":^D");
indexOutbox.sendNotice(IndexMqEndpoints.SWITCH_LINKDB, ":-)");
case SwitchIndex() -> {
indexOutbox.sendNotice(IndexMqEndpoints.SWITCH_INDEX, "here");
indexOutbox.sendNotice(IndexMqEndpoints.SWITCH_LINKDB, "we");
// Defer repartitioning the domains until after the index has been switched
indexOutbox.sendNotice(IndexMqEndpoints.INDEX_REPARTITION, "go");
yield new End();
}
@ -207,7 +209,6 @@ public class ConvertAndLoadActor extends RecordActorPrototype {
this.mqIndexConstructorOutbox = processOutboxes.getIndexConstructorOutbox();
this.storageService = storageService;
this.backupService = backupService;
this.gson = gson;
this.nodeConfigurationService = nodeConfigurationService;
this.nodeId = serviceConfiguration.node();

View File

@ -29,7 +29,7 @@ public class RestoreBackupActor extends RecordActorPrototype {
ExecutorActor.CONVERT_AND_LOAD.id(node),
null,
null,
ConvertAndLoadActor.REPARTITION,
ConvertAndLoadActor.RERANK,
"",
null);

View File

@ -97,6 +97,14 @@ public class IndexService extends Service {
volatile boolean initialized = false;
@MqRequest(endpoint = IndexMqEndpoints.INDEX_RERANK)
public String rerank(String message) {
if (!opsService.rerank()) {
throw new IllegalStateException("Ops lock busy");
}
return "ok";
}
@MqRequest(endpoint = IndexMqEndpoints.INDEX_REPARTITION)
public String repartition(String message) {
if (!opsService.repartition()) {

View File

@ -30,9 +30,14 @@ public class IndexOpsService {
return opsLock.isLocked();
}
public boolean repartition() {
return run(searchSetService::recalculateAll);
public boolean rerank() {
return run(searchSetService::recalculatePrimaryRank);
}
public boolean repartition() {
return run(searchSetService::recalculateSecondary);
}
public boolean switchIndex() throws Exception {
return run(index::switchIndex).isPresent();
}
@ -40,7 +45,7 @@ public class IndexOpsService {
public Object repartitionEndpoint(Request request, Response response) throws Exception {
if (!run(searchSetService::recalculateAll)) {
if (!run(searchSetService::recalculateSecondary)) {
Spark.halt(503, "Operations busy");
}

View File

@ -21,6 +21,7 @@ import nu.marginalia.ranking.DomainRankings;
import nu.marginalia.index.db.DbUpdateRanks;
import nu.marginalia.service.control.ServiceEventLog;
import nu.marginalia.service.control.ServiceHeartbeat;
import nu.marginalia.service.module.ServiceConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,12 +45,14 @@ public class IndexSearchSetsService {
private final ConcurrentHashMap<String, SearchSet> rankingSets = new ConcurrentHashMap<>();
// Below are binary indices that are used to constrain a search
private final SearchSet anySet = new SearchSetAny();
private final int nodeId;
// The ranking value of the domains used in sorting the domains
private volatile DomainRankings domainRankings = new DomainRankings();
@Inject
public IndexSearchSetsService(DomainTypes domainTypes,
ServiceConfiguration serviceConfiguration,
ServiceHeartbeat heartbeat,
RankingDomainFetcher rankingDomains,
RankingDomainFetcherForSimilarityData similarityDomains,
@ -57,6 +60,7 @@ public class IndexSearchSetsService {
ServiceEventLog eventLog,
DomainRankingSetsService domainRankingSetsService,
DbUpdateRanks dbUpdateRanks) throws IOException {
this.nodeId = serviceConfiguration.node();
this.domainTypes = domainTypes;
this.heartbeat = heartbeat;
this.indexServicesFactory = indexServicesFactory;
@ -102,14 +106,25 @@ public class IndexSearchSetsService {
return Objects.requireNonNull(rankingSets.get(searchSetIdentifier), "Unknown search set");
}
public void recalculateAll() {
/** Recalculates the primary ranking set. This gets baked into the identifiers in the index, effectively
* changing their sort order, so it's important to run this before reconstructing the indices. */
public void recalculatePrimaryRank() {
try {
domainRankingSetsService.get("RANK").ifPresent(this::updateDomainRankings);
eventLog.logEvent("RANKING-SET-RECALCULATED", "RANK");
} catch (SQLException e) {
logger.warn("Failed to primary ranking set", e);
}
}
public void recalculateSecondary() {
for (var rankingSet : domainRankingSetsService.getAll()) {
try {
if (DomainRankingSetsService.DomainSetAlgorithm.SPECIAL.equals(rankingSet.algorithm())) {
switch (rankingSet.name()) {
case "BLOGS" -> recalculateBlogsSet(rankingSet);
case "RANK" -> updateDomainRankings(rankingSet);
case "NONE" -> {}
case "RANK" -> {} // Skipped, handled via recalculatePrimaryRank()
case "NONE" -> {} // No-op
}
} else {
recalculateNornal(rankingSet);
@ -173,9 +188,13 @@ public class IndexSearchSetsService {
domainRankings = new DomainRankings(ranks);
}
// The EC_DOMAIN table has a field that reflects the rank, this needs to be set for search result ordering to
// make sense
dbUpdateRanks.execute(ranks);
domainRankings.save(indexServicesFactory.getSearchSetsBase());
if (nodeId == 1) {
// The EC_DOMAIN table has a field that reflects the rank, this needs to be set for search result ordering to
// make sense, but only do this on the primary node to avoid excessive db locks
dbUpdateRanks.execute(ranks);
}
}
}