(control) Temporarily re-writing the data balancer to get it to work in prod

Need to clean this up later.
This commit is contained in:
Viktor Lofgren 2023-10-21 16:33:59 +02:00
parent e927f99777
commit 12fda1a36b
5 changed files with 136 additions and 124 deletions

View File

@ -15,7 +15,7 @@ public class AbstractDynamicClient extends AbstractClient {
public AbstractDynamicClient(@Nonnull ServiceDescriptor service, Supplier<Gson> gsonProvider) {
super(
service,
10,
10000,
gsonProvider
);

View File

@ -6,6 +6,9 @@ import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.mq.MqMessageState;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.nodecfg.NodeConfigurationService;
import nu.marginalia.nodecfg.model.NodeConfiguration;
@ -13,18 +16,22 @@ import org.jetbrains.annotations.NotNull;
import java.sql.SQLException;
import java.util.*;
import com.google.gson.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RebalanceActor extends AbstractActorPrototype {
// States
public static final String INIT = "INIT";
public static final String CALCULATE_TRANSACTIONS = "CALCULATE_TRANSACTIONS";
public static final String END = "END";
private static final Logger logger = LoggerFactory.getLogger(RebalanceActor.class);
private final NodeConfigurationService nodeConfigurationService;
private final MqPersistence mqPersistence;
private final HikariDataSource dataSource;
private final Gson gson = GsonFactory.get();
@Override
public String describe() {
return "Rebalances crawl data among the nodes";
@ -41,16 +48,10 @@ public class RebalanceActor extends AbstractActorPrototype {
this.dataSource = dataSource;
}
@ActorState(name= INIT, next = CALCULATE_TRANSACTIONS, resume = ActorResumeBehavior.ERROR,
description = "Fetches the number of domains assigned to each eligible processing node")
public List<Pop> getPopulations() throws Exception {
return getNodePopulations();
}
@ActorState(name= CALCULATE_TRANSACTIONS, next = END, resume = ActorResumeBehavior.ERROR,
description = "Calculates how many domains to re-assign between the processing nodes"
)
public List<Give> calculateTransactions(List<Pop> populations) {
@ActorState(name= INIT, next = END, resume = ActorResumeBehavior.ERROR,
description = "Rebalance!")
public void doIt() throws Exception {
var populations = getNodePopulations();
if (populations.size() <= 1) {
transition(END);
@ -91,7 +92,16 @@ public class RebalanceActor extends AbstractActorPrototype {
}
}
return actions;
for (var action : actions) {
var outbox = new MqOutbox(mqPersistence, "executor-service", action.dest, getClass().getSimpleName(), 0, UUID.randomUUID());
var msg = outbox.send("TRANSFER-DOMAINS",
gson.toJson(Map.of("sourceNode", action.donor, "count", action.c)));
if (msg.state() != MqMessageState.OK) {
logger.error("ERROR! {}", msg);
}
outbox.stop();
}
}
private List<Pop> getNodePopulations() throws SQLException {
@ -163,6 +173,8 @@ public class RebalanceActor extends AbstractActorPrototype {
}
}
public record Populations(List<Pop> pops) {
}
public record Pop(int node, int count) {
}

View File

@ -8,35 +8,18 @@ import lombok.NoArgsConstructor;
import lombok.With;
import nu.marginalia.actor.ActorStateFactory;
import nu.marginalia.actor.prototype.AbstractActorPrototype;
import nu.marginalia.actor.state.ActorResumeBehavior;
import nu.marginalia.actor.state.ActorState;
import nu.marginalia.client.Context;
import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.mqapi.ProcessInboxNames;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageBaseType;
import nu.marginalia.storage.model.FileStorageType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;
@Singleton
public class TransferDomainsActor extends AbstractActorPrototype {
// STATES
public static final String INITIAL = "INITIAL";
public static final String TRANSFER_DOMAINS = "TRANSFER-DOMAINS";
public static final String UPDATE_DONOR_LOG = "UPDATE_DONOR_LOG";
public static final String END = "END";
private final FileStorageService storageService;
@ -77,101 +60,13 @@ public class TransferDomainsActor extends AbstractActorPrototype {
}
@ActorState(name = INITIAL,
next = TRANSFER_DOMAINS,
next = END,
description = """
Ensure preconditions are met
Transfer the domains
""")
public Message init(Message message) throws Exception {
var storages = storageService.getOnlyActiveFileStorage(FileStorageType.CRAWL_DATA);
public void init(Message message) throws Exception {
// Ensure crawl data exists to receive into
if (storages.isEmpty()) {
var storage = storageService.allocateTemporaryStorage(
storageService.getStorageBase(FileStorageBaseType.STORAGE),
FileStorageType.CRAWL_DATA,
"crawl-data",
"Crawl Data"
);
storageService.enableFileStorage(storage.id());
}
return message;
}
@ActorState(name = TRANSFER_DOMAINS,
next = UPDATE_DONOR_LOG,
resume = ActorResumeBehavior.ERROR,
description = """
Do the needful
"""
)
public Message transferData(Message message) throws Exception {
var storageId = storageService
.getOnlyActiveFileStorage(FileStorageType.CRAWL_DATA)
.orElseThrow(AssertionError::new); // This Shouldn't Happen (tm)
var storage = storageService.getStorage(storageId);
var spec = executorClient.getTransferSpec(Context.internal(), message.sourceNode, message.count);
if (spec.size() == 0) {
transition("END", "NOTHING TO TRANSFER");
}
Path basePath = storage.asPath();
try (var workLog = new WorkLog(basePath.resolve("crawler.log"));
var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("UPDATE EC_DOMAIN SET NODE_AFFINITY=? WHERE ID=?");
) {
for (var item : spec.items()) {
logger.info("{}", item);
logger.info("Transferring {}", item.domainName());
Path dest = basePath.resolve(item.path());
Files.createDirectories(dest.getParent());
try (var fileStream = Files.newOutputStream(dest)) {
executorClient.transferFile(Context.internal(),
message.sourceNode,
item.fileStorageId(),
item.path(),
fileStream);
stmt.setInt(1, nodeId);
stmt.setInt(2, item.domainId());
stmt.executeUpdate();
executorClient.yieldDomain(Context.internal(), message.sourceNode, item);
workLog.setJobToFinished(item.domainName(), item.path(), 1);
}
catch (IOException ex) {
Files.deleteIfExists(dest);
error(ex);
}
catch (Exception ex) {
error(ex);
}
}
}
return message;
}
@ActorState(name = UPDATE_DONOR_LOG,
next = END,
resume = ActorResumeBehavior.ERROR,
description = """
Do the needful
"""
)
public void updateDonorLog(Message message) throws InterruptedException {
var outbox = new MqOutbox(persistence, executorServiceName, message.sourceNode,
getClass().getSimpleName(), nodeId, UUID.randomUUID());
try {
outbox.send("PRUNE-CRAWL-DATA", ":-)");
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
outbox.stop();
}
}
}

View File

@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
// Weird name for this one to not have clashes with java.util.concurrent.ExecutorService
public class ExecutorSvc extends Service {
private final BaseServiceParams params;
private final Gson gson;
private final ExecutorActorControlService actorControlService;
private final FileStorageService fileStorageService;
private final TransferService transferService;
@ -51,6 +52,7 @@ public class ExecutorSvc extends Service {
ActorApi actorApi) {
super(params);
this.params = params;
this.gson = gson;
this.actorControlService = actorControlService;
this.fileStorageService = fileStorageService;
this.transferService = transferService;
@ -92,9 +94,26 @@ public class ExecutorSvc extends Service {
actorControlService.start(ExecutorActor.PROC_INDEX_CONSTRUCTOR_SPAWNER);
actorControlService.start(ExecutorActor.PROC_LOADER_SPAWNER);
}
@MqRequest(endpoint="TRANSFER-DOMAINS")
public String transferDomains(String message) throws Exception {
var spec = gson.fromJson(message, TransferService.TransferReq.class);
synchronized (this) {
transferService.transferMqEndpoint(spec.sourceNode(), spec.count());
}
return "OK";
}
@MqRequest(endpoint="PRUNE-CRAWL-DATA")
public String pruneCrawlData(String message) throws SQLException, IOException {
transferService.pruneCrawlDataMqEndpoint();
synchronized (this) { // would not be great if this ran in parallel with itself
transferService.pruneCrawlDataMqEndpoint();
}
return "OK";
}

View File

@ -4,13 +4,18 @@ import com.google.gson.Gson;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows;
import nu.marginalia.client.Context;
import nu.marginalia.executor.client.ExecutorClient;
import nu.marginalia.executor.model.transfer.TransferItem;
import nu.marginalia.executor.model.transfer.TransferSpec;
import nu.marginalia.executor.storage.FileStorageContent;
import nu.marginalia.executor.storage.FileStorageFile;
import nu.marginalia.mq.outbox.MqOutbox;
import nu.marginalia.mq.persistence.MqPersistence;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.ServiceConfiguration;
import nu.marginalia.storage.FileStorageService;
import nu.marginalia.storage.model.FileStorageBaseType;
import nu.marginalia.storage.model.FileStorageId;
import nu.marginalia.storage.model.FileStorageType;
import org.apache.commons.io.FileUtils;
@ -27,11 +32,15 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.UUID;
public class TransferService {
private final Gson gson;
private final FileStorageService fileStorageService;
private final HikariDataSource dataSource;
private final ExecutorClient executorClient;
private final MqPersistence persistence;
private final String executorServiceName;
private final int nodeId;
private static final Logger logger = LoggerFactory.getLogger(TransferService.class);
@ -40,12 +49,15 @@ public class TransferService {
Gson gson,
FileStorageService fileStorageService,
HikariDataSource dataSource,
ServiceConfiguration config)
ExecutorClient executorClient, MqPersistence persistence, ServiceConfiguration config)
{
this.gson = gson;
this.fileStorageService = fileStorageService;
this.dataSource = dataSource;
this.executorClient = executorClient;
this.persistence = persistence;
this.nodeId = config.node();
this.executorServiceName = config.serviceName();
}
public Object transferFile(Request request, Response response) throws SQLException, IOException {
@ -169,4 +181,78 @@ public class TransferService {
Files.move(newCrawlLogPath, oldCrawlLogPath, StandardCopyOption.REPLACE_EXISTING);
}
public void transferMqEndpoint(int sourceNode, int count) throws Exception {
var storages = fileStorageService.getOnlyActiveFileStorage(FileStorageType.CRAWL_DATA);
// Ensure crawl data exists to receive into
if (storages.isEmpty()) {
var storage = fileStorageService.allocateTemporaryStorage(
fileStorageService.getStorageBase(FileStorageBaseType.STORAGE),
FileStorageType.CRAWL_DATA,
"crawl-data",
"Crawl Data"
);
fileStorageService.enableFileStorage(storage.id());
}
var storageId = fileStorageService
.getOnlyActiveFileStorage(FileStorageType.CRAWL_DATA)
.orElseThrow(AssertionError::new); // This Shouldn't Happen (tm)
var storage = fileStorageService.getStorage(storageId);
var spec = executorClient.getTransferSpec(Context.internal(), sourceNode, count);
if (spec.size() == 0) {
return;
}
Path basePath = storage.asPath();
try (var workLog = new WorkLog(basePath.resolve("crawler.log"));
var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("UPDATE EC_DOMAIN SET NODE_AFFINITY=? WHERE ID=?");
) {
for (var item : spec.items()) {
logger.info("{}", item);
logger.info("Transferring {}", item.domainName());
Path dest = basePath.resolve(item.path());
Files.createDirectories(dest.getParent());
try (var fileStream = Files.newOutputStream(dest)) {
executorClient.transferFile(Context.internal(),
sourceNode,
item.fileStorageId(),
item.path(),
fileStream);
stmt.setInt(1, nodeId);
stmt.setInt(2, item.domainId());
stmt.executeUpdate();
executorClient.yieldDomain(Context.internal(), sourceNode, item);
workLog.setJobToFinished(item.domainName(), item.path(), 1);
}
catch (IOException ex) {
Files.deleteIfExists(dest);
throw new RuntimeException(ex);
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}
var outbox = new MqOutbox(persistence, executorServiceName, sourceNode,
getClass().getSimpleName(), nodeId, UUID.randomUUID());
try {
outbox.send("PRUNE-CRAWL-DATA", ":-)");
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
outbox.stop();
}
}
public record TransferReq(int sourceNode, int count) { }
}