Process monitoring

* Also refactored the SQL tables a bit
This commit is contained in:
Viktor Lofgren 2023-07-11 14:46:21 +02:00
parent f59cab300e
commit 4c016b0318
37 changed files with 431 additions and 78 deletions

View File

@ -1,17 +0,0 @@
CREATE TABLE PROC_SERVICE_HEARTBEAT(
SERVICE_NAME VARCHAR(255) PRIMARY KEY COMMENT 'Full name of the service, including node id if applicable, e.g. search-service:0',
SERVICE_BASE VARCHAR(255) NOT NULL COMMENT 'Base name of the service, e.g. search-service',
INSTANCE VARCHAR(255) NOT NULL COMMENT 'UUID of the service instance',
ALIVE BOOLEAN NOT NULL DEFAULT TRUE COMMENT 'Set to false when the service is doing an orderly shutdown',
HEARTBEAT_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT 'Service was last seen at this point'
);
CREATE TABLE PROC_SERVICE_EVENTLOG(
ID BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT 'Unique id',
SERVICE_NAME VARCHAR(255) NOT NULL COMMENT 'Full name of the service, including node id if applicable, e.g. search-service:0',
SERVICE_BASE VARCHAR(255) NOT NULL COMMENT 'Base name of the service, e.g. search-service',
INSTANCE VARCHAR(255) NOT NULL COMMENT 'UUID of the service instance',
EVENT_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT 'Event time',
EVENT_TYPE VARCHAR(255) NOT NULL COMMENT 'Event type',
EVENT_MESSAGE VARCHAR(255) NOT NULL COMMENT 'Event message'
);

View File

@ -1,4 +1,4 @@
CREATE TABLE PROC_SERVICE_HEARTBEAT(
CREATE TABLE IF NOT EXISTS SERVICE_HEARTBEAT (
SERVICE_NAME VARCHAR(255) PRIMARY KEY COMMENT "Full name of the service, including node id if applicable, e.g. search-service:0",
SERVICE_BASE VARCHAR(255) NOT NULL COMMENT "Base name of the service, e.g. search-service",
INSTANCE VARCHAR(255) NOT NULL COMMENT "UUID of the service instance",
@ -6,7 +6,16 @@ CREATE TABLE PROC_SERVICE_HEARTBEAT(
HEARTBEAT_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT "Service was last seen at this point"
);
CREATE TABLE PROC_SERVICE_EVENTLOG(
CREATE TABLE IF NOT EXISTS PROCESS_HEARTBEAT (
PROCESS_NAME VARCHAR(255) PRIMARY KEY COMMENT "Full name of the process, including node id if applicable, e.g. converter:0",
PROCESS_BASE VARCHAR(255) NOT NULL COMMENT "Base name of the process, e.g. converter",
INSTANCE VARCHAR(255) NOT NULL COMMENT "UUID of the process instance",
STATUS ENUM ('STARTING', 'RUNNING', 'STOPPED') NOT NULL DEFAULT 'STARTING' COMMENT "Status of the process",
PROGRESS INT NOT NULL DEFAULT 0 COMMENT "Progress of the process",
HEARTBEAT_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT "Process was last seen at this point"
);
CREATE TABLE IF NOT EXISTS SERVICE_EVENTLOG(
ID BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT "Unique id",
SERVICE_NAME VARCHAR(255) NOT NULL COMMENT "Full name of the service, including node id if applicable, e.g. search-service:0",
SERVICE_BASE VARCHAR(255) NOT NULL COMMENT "Base name of the service, e.g. search-service",
@ -14,4 +23,5 @@ CREATE TABLE PROC_SERVICE_EVENTLOG(
EVENT_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT "Event time",
EVENT_TYPE VARCHAR(255) NOT NULL COMMENT "Event type",
EVENT_MESSAGE VARCHAR(255) NOT NULL COMMENT "Event message"
);
);

View File

@ -1,22 +1,17 @@
CREATE TABLE PROC_MESSAGE(
CREATE TABLE IF NOT EXISTS MESSAGE_QUEUE (
ID BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT 'Unique id',
RELATED_ID BIGINT COMMENT 'Unique id a related message',
SENDER_INBOX VARCHAR(255) COMMENT 'Name of the sender inbox',
RECIPIENT_INBOX VARCHAR(255) NOT NULL COMMENT 'Name of the recipient inbox',
FUNCTION VARCHAR(255) NOT NULL COMMENT 'Which function to run',
PAYLOAD TEXT COMMENT 'Message to recipient',
-- These fields are used to avoid double processing of messages
-- instance marks the unique instance of the party, and the tick marks
-- the current polling iteration. Both are necessary.
OWNER_INSTANCE VARCHAR(255) COMMENT 'Instance UUID corresponding to the party that has claimed the message',
OWNER_TICK BIGINT DEFAULT -1 COMMENT 'Used by recipient to determine which messages it has processed',
STATE ENUM('NEW', 'ACK', 'OK', 'ERR', 'DEAD')
NOT NULL DEFAULT 'NEW' COMMENT 'Processing state',
CREATED_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT 'Time of creation',
UPDATED_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT 'Time of last update',
TTL INT COMMENT 'Time to live in seconds'

View File

@ -0,0 +1,27 @@
CREATE TABLE IF NOT EXISTS SERVICE_HEARTBEAT (
SERVICE_NAME VARCHAR(255) PRIMARY KEY COMMENT "Full name of the service, including node id if applicable, e.g. search-service:0",
SERVICE_BASE VARCHAR(255) NOT NULL COMMENT "Base name of the service, e.g. search-service",
INSTANCE VARCHAR(255) NOT NULL COMMENT "UUID of the service instance",
ALIVE BOOLEAN NOT NULL DEFAULT TRUE COMMENT "Set to false when the service is doing an orderly shutdown",
HEARTBEAT_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT "Service was last seen at this point"
);
CREATE TABLE IF NOT EXISTS PROCESS_HEARTBEAT (
PROCESS_NAME VARCHAR(255) PRIMARY KEY COMMENT "Full name of the process, including node id if applicable, e.g. converter:0",
PROCESS_BASE VARCHAR(255) NOT NULL COMMENT "Base name of the process, e.g. converter",
INSTANCE VARCHAR(255) NOT NULL COMMENT "UUID of the process instance",
STATUS ENUM ('STARTING', 'RUNNING', 'STOPPED') NOT NULL DEFAULT 'STARTING' COMMENT "Status of the process",
PROGRESS INT NOT NULL DEFAULT 0 COMMENT "Progress of the process",
HEARTBEAT_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT "Process was last seen at this point"
);
CREATE TABLE IF NOT EXISTS SERVICE_EVENTLOG(
ID BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT "Unique id",
SERVICE_NAME VARCHAR(255) NOT NULL COMMENT "Full name of the service, including node id if applicable, e.g. search-service:0",
SERVICE_BASE VARCHAR(255) NOT NULL COMMENT "Base name of the service, e.g. search-service",
INSTANCE VARCHAR(255) NOT NULL COMMENT "UUID of the service instance",
EVENT_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT "Event time",
EVENT_TYPE VARCHAR(255) NOT NULL COMMENT "Event type",
EVENT_MESSAGE VARCHAR(255) NOT NULL COMMENT "Event message"
);

View File

@ -1,19 +1,17 @@
CREATE TABLE PROC_MESSAGE(
CREATE TABLE IF NOT EXISTS MESSAGE_QUEUE (
ID BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT 'Unique id',
RELATED_ID BIGINT NOT NULL DEFAULT -1 COMMENT 'Unique id a related message',
RELATED_ID BIGINT COMMENT 'Unique id a related message',
SENDER_INBOX VARCHAR(255) COMMENT 'Name of the sender inbox',
RECIPIENT_INBOX VARCHAR(255) NOT NULL COMMENT 'Name of the recipient inbox',
FUNCTION VARCHAR(255) NOT NULL COMMENT 'Which function to run',
PAYLOAD TEXT COMMENT 'Message to recipient',
-- These fields are used to avoid double processing of messages
-- instance marks the unique instance of the party, and the tick marks
-- the current polling iteration. Both are necessary.
OWNER_INSTANCE VARCHAR(255) COMMENT 'Instance UUID corresponding to the party that has claimed the message',
OWNER_TICK BIGINT DEFAULT -1 COMMENT 'Used by recipient to determine which messages it has processed',
STATE ENUM('NEW', 'ACK', 'OK', 'ERR', 'DEAD')
NOT NULL DEFAULT 'NEW' COMMENT 'Processing state',
CREATED_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT 'Time of creation',
UPDATED_TIME TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6) COMMENT 'Time of last update',
TTL INT COMMENT 'Time to live in seconds'

View File

@ -25,7 +25,7 @@ public class MqPersistence {
public int reapDeadMessages() throws SQLException {
try (var conn = dataSource.getConnection();
var setToDead = conn.prepareStatement("""
UPDATE PROC_MESSAGE
UPDATE MESSAGE_QUEUE
SET STATE='DEAD', UPDATED_TIME=CURRENT_TIMESTAMP(6)
WHERE STATE IN ('NEW', 'ACK')
AND TTL IS NOT NULL
@ -39,7 +39,7 @@ public class MqPersistence {
public int cleanOldMessages() throws SQLException {
try (var conn = dataSource.getConnection();
var setToDead = conn.prepareStatement("""
DELETE FROM PROC_MESSAGE
DELETE FROM MESSAGE_QUEUE
WHERE STATE = 'OK'
AND TTL IS NOT NULL
AND TIMESTAMPDIFF(SECOND, UPDATED_TIME, CURRENT_TIMESTAMP(6)) > 3600
@ -67,7 +67,7 @@ public class MqPersistence {
) throws Exception {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
INSERT INTO PROC_MESSAGE(RECIPIENT_INBOX, SENDER_INBOX, FUNCTION, PAYLOAD, TTL)
INSERT INTO MESSAGE_QUEUE(RECIPIENT_INBOX, SENDER_INBOX, FUNCTION, PAYLOAD, TTL)
VALUES(?, ?, ?, ?, ?)
""");
var lastIdQuery = conn.prepareStatement("SELECT LAST_INSERT_ID()")) {
@ -97,7 +97,7 @@ public class MqPersistence {
public void updateMessageState(long id, MqMessageState mqMessageState) throws SQLException {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
UPDATE PROC_MESSAGE
UPDATE MESSAGE_QUEUE
SET STATE=?, UPDATED_TIME=CURRENT_TIMESTAMP(6)
WHERE ID=?
""")) {
@ -118,14 +118,14 @@ public class MqPersistence {
conn.setAutoCommit(false);
try (var updateState = conn.prepareStatement("""
UPDATE PROC_MESSAGE
UPDATE MESSAGE_QUEUE
SET STATE=?, UPDATED_TIME=CURRENT_TIMESTAMP(6)
WHERE ID=?
""");
var addResponse = conn.prepareStatement("""
INSERT INTO PROC_MESSAGE(RECIPIENT_INBOX, RELATED_ID, FUNCTION, PAYLOAD)
INSERT INTO MESSAGE_QUEUE(RECIPIENT_INBOX, RELATED_ID, FUNCTION, PAYLOAD)
SELECT SENDER_INBOX, ID, ?, ?
FROM PROC_MESSAGE
FROM MESSAGE_QUEUE
WHERE ID=? AND SENDER_INBOX IS NOT NULL
""");
var lastIdQuery = conn.prepareStatement("SELECT LAST_INSERT_ID()")
@ -170,7 +170,7 @@ public class MqPersistence {
private int markInboxMessages(String inboxName, String instanceUUID, long tick) throws SQLException {
try (var conn = dataSource.getConnection();
var updateStmt = conn.prepareStatement("""
UPDATE PROC_MESSAGE
UPDATE MESSAGE_QUEUE
SET OWNER_INSTANCE=?, OWNER_TICK=?, UPDATED_TIME=CURRENT_TIMESTAMP(6), STATE='ACK'
WHERE RECIPIENT_INBOX=?
AND OWNER_INSTANCE IS NULL AND STATE='NEW'
@ -197,7 +197,7 @@ public class MqPersistence {
// Then fetch the messages that were marked
try (var conn = dataSource.getConnection();
var queryStmt = conn.prepareStatement("""
SELECT ID, RELATED_ID, FUNCTION, PAYLOAD, STATE, SENDER_INBOX FROM PROC_MESSAGE
SELECT ID, RELATED_ID, FUNCTION, PAYLOAD, STATE, SENDER_INBOX FROM MESSAGE_QUEUE
WHERE OWNER_INSTANCE=? AND OWNER_TICK=?
""")
) {
@ -242,8 +242,8 @@ public class MqPersistence {
// Then fetch the messages that were marked
try (var conn = dataSource.getConnection();
var queryStmt = conn.prepareStatement("""
SELECT SELF.ID, SELF.RELATED_ID, SELF.FUNCTION, SELF.PAYLOAD, PARENT.STATE FROM PROC_MESSAGE SELF
LEFT JOIN PROC_MESSAGE PARENT ON SELF.RELATED_ID=PARENT.ID
SELECT SELF.ID, SELF.RELATED_ID, SELF.FUNCTION, SELF.PAYLOAD, PARENT.STATE FROM MESSAGE_QUEUE SELF
LEFT JOIN MESSAGE_QUEUE PARENT ON SELF.RELATED_ID=PARENT.ID
WHERE SELF.OWNER_INSTANCE=? AND SELF.OWNER_TICK=?
""")
) {
@ -275,7 +275,7 @@ public class MqPersistence {
public List<MqMessage> lastNMessages(String inboxName, int lastN) throws SQLException {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT ID, RELATED_ID, FUNCTION, PAYLOAD, STATE, SENDER_INBOX FROM PROC_MESSAGE
SELECT ID, RELATED_ID, FUNCTION, PAYLOAD, STATE, SENDER_INBOX FROM MESSAGE_QUEUE
WHERE RECIPIENT_INBOX = ?
ORDER BY ID DESC LIMIT ?
""")) {

View File

@ -20,7 +20,7 @@ public class MqTestUtil {
OWNER_INSTANCE, OWNER_TICK,
CREATED_TIME, UPDATED_TIME,
TTL
FROM PROC_MESSAGE
FROM MESSAGE_QUEUE
WHERE RECIPIENT_INBOX = ?
"""))
{

View File

@ -28,7 +28,7 @@ public class MqOutboxTest {
.withDatabaseName("WMSA_prod")
.withUsername("wmsa")
.withPassword("wmsa")
.withInitScript("sql/current/11-message-queue.sql")
.withInitScript("sql/current/12-message-queue.sql")
.withNetworkAliases("mariadb");
static HikariDataSource dataSource;

View File

@ -24,7 +24,7 @@ public class MqPersistenceTest {
.withDatabaseName("WMSA_prod")
.withUsername("wmsa")
.withPassword("wmsa")
.withInitScript("sql/current/11-message-queue.sql")
.withInitScript("sql/current/12-message-queue.sql")
.withNetworkAliases("mariadb");
static HikariDataSource dataSource;

View File

@ -27,7 +27,7 @@ public class StateMachineErrorTest {
.withDatabaseName("WMSA_prod")
.withUsername("wmsa")
.withPassword("wmsa")
.withInitScript("sql/current/11-message-queue.sql")
.withInitScript("sql/current/12-message-queue.sql")
.withNetworkAliases("mariadb");
static HikariDataSource dataSource;

View File

@ -28,7 +28,7 @@ public class StateMachineResumeTest {
.withDatabaseName("WMSA_prod")
.withUsername("wmsa")
.withPassword("wmsa")
.withInitScript("sql/current/11-message-queue.sql")
.withInitScript("sql/current/12-message-queue.sql")
.withNetworkAliases("mariadb");
static HikariDataSource dataSource;

View File

@ -24,7 +24,7 @@ public class StateMachineTest {
.withDatabaseName("WMSA_prod")
.withUsername("wmsa")
.withPassword("wmsa")
.withInitScript("sql/current/11-message-queue.sql")
.withInitScript("sql/current/12-message-queue.sql")
.withNetworkAliases("mariadb");
static HikariDataSource dataSource;

View File

@ -20,6 +20,7 @@ dependencies {
implementation libs.guava
implementation libs.guice
implementation libs.bundles.mariadb
implementation libs.commons.lang3
implementation libs.snakeyaml

View File

@ -0,0 +1,7 @@
package nu.marginalia;
import java.util.UUID;
public record ProcessConfiguration(String processName, int node, UUID instanceUuid) {
}

View File

@ -0,0 +1,155 @@
package nu.marginalia.process.control;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.ProcessConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.concurrent.TimeUnit;
/** This service sends a heartbeat to the database every 5 seconds.
*/
@Singleton
public class ProcessHeartbeat {
private final Logger logger = LoggerFactory.getLogger(ProcessHeartbeat.class);
private final String processName;
private final String processBase;
private final String instanceUUID;
private final HikariDataSource dataSource;
private final Thread runnerThread;
private final int heartbeatInterval = Integer.getInteger("mcp.heartbeat.interval", 1);
private volatile boolean running = false;
private volatile int progress = -1;
@Inject
public ProcessHeartbeat(ProcessConfiguration configuration,
HikariDataSource dataSource)
{
this.processName = configuration.processName() + ":" + configuration.node();
this.processBase = configuration.processName();
this.dataSource = dataSource;
this.instanceUUID = configuration.instanceUuid().toString();
runnerThread = new Thread(this::run);
Runtime.getRuntime().addShutdownHook(new Thread(this::shutDown));
}
public void setProgress(double progress) {
this.progress = (int) (progress * 100);
}
public void start() {
if (!running) {
runnerThread.start();
}
}
public void shutDown() {
if (!running)
return;
running = false;
try {
runnerThread.join();
heartbeatStop();
}
catch (InterruptedException|SQLException ex) {
logger.warn("ServiceHeartbeat shutdown failed", ex);
}
}
private void run() {
if (!running)
running = true;
else
return;
try {
heartbeatInit();
while (running) {
try {
heartbeatUpdate();
}
catch (SQLException ex) {
logger.warn("ServiceHeartbeat failed to update", ex);
}
TimeUnit.SECONDS.sleep(heartbeatInterval);
}
}
catch (InterruptedException|SQLException ex) {
logger.error("ServiceHeartbeat caught irrecoverable exception, killing service", ex);
System.exit(255);
}
}
private void heartbeatInit() throws SQLException {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
INSERT INTO PROCESS_HEARTBEAT (PROCESS_NAME, PROCESS_BASE, INSTANCE, HEARTBEAT_TIME, STATUS)
VALUES (?, ?, ?, CURRENT_TIMESTAMP(6), 'STARTING')
ON DUPLICATE KEY UPDATE
INSTANCE = ?,
HEARTBEAT_TIME = CURRENT_TIMESTAMP(6),
STATUS = 'STARTING'
"""
))
{
stmt.setString(1, processName);
stmt.setString(2, processBase);
stmt.setString(3, instanceUUID);
stmt.setString(4, instanceUUID);
stmt.executeUpdate();
}
}
}
private void heartbeatUpdate() throws SQLException {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
UPDATE PROCESS_HEARTBEAT
SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6), STATUS = 'RUNNING', PROGRESS = ?
WHERE INSTANCE = ?
""")
)
{
stmt.setInt(1, progress);
stmt.setString(2, instanceUUID);
stmt.executeUpdate();
}
}
}
private void heartbeatStop() throws SQLException {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
UPDATE PROCESS_HEARTBEAT
SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6), STATUS='STOPPED', PROGRESS=?
WHERE INSTANCE = ?
""")
)
{
stmt.setInt(1, progress);
stmt.setString( 2, instanceUUID);
stmt.executeUpdate();
}
}
}
}

View File

@ -81,7 +81,7 @@ public class ServiceMonitors {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT SERVICE_BASE, TIMESTAMPDIFF(SECOND, HEARTBEAT_TIME, CURRENT_TIMESTAMP(6))
FROM PROC_SERVICE_HEARTBEAT
FROM SERVICE_HEARTBEAT
WHERE ALIVE=1
""")) {
try (var rs = stmt.executeQuery()) {

View File

@ -40,7 +40,7 @@ public class ServiceEventLog {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
INSERT INTO PROC_SERVICE_EVENTLOG(SERVICE_NAME, SERVICE_BASE, INSTANCE, EVENT_TYPE, EVENT_MESSAGE)
INSERT INTO SERVICE_EVENTLOG(SERVICE_NAME, SERVICE_BASE, INSTANCE, EVENT_TYPE, EVENT_MESSAGE)
VALUES (?, ?, ?, ?, ?)
""")) {
stmt.setString(1, serviceName);

View File

@ -93,7 +93,7 @@ public class ServiceHeartbeat {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
INSERT INTO PROC_SERVICE_HEARTBEAT (SERVICE_NAME, SERVICE_BASE, INSTANCE, HEARTBEAT_TIME, ALIVE)
INSERT INTO SERVICE_HEARTBEAT (SERVICE_NAME, SERVICE_BASE, INSTANCE, HEARTBEAT_TIME, ALIVE)
VALUES (?, ?, ?, CURRENT_TIMESTAMP(6), 1)
ON DUPLICATE KEY UPDATE
INSTANCE = ?,
@ -115,7 +115,7 @@ public class ServiceHeartbeat {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
UPDATE PROC_SERVICE_HEARTBEAT
UPDATE SERVICE_HEARTBEAT
SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6)
WHERE INSTANCE = ? AND ALIVE = 1
""")
@ -131,7 +131,7 @@ public class ServiceHeartbeat {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement(
"""
UPDATE PROC_SERVICE_HEARTBEAT
UPDATE SERVICE_HEARTBEAT
SET HEARTBEAT_TIME = CURRENT_TIMESTAMP(6), ALIVE = 0
WHERE INSTANCE = ?
""")

View File

@ -107,6 +107,18 @@ public class CrawlPlan {
throw new RuntimeException(ex);
}
}
public int countCrawledDomains() {
try (Stream<WorkLogEntry> entryStream = WorkLog.streamLog(crawl.getLogFile())) {
return (int) entryStream
.map(WorkLogEntry::path)
.count();
}
catch (IOException ex) {
return 0;
}
}
public void forEachCrawledDomain(Predicate<String> idReadPredicate, Consumer<CrawledDomain> consumer) {
final CrawledDomainReader reader = new CrawledDomainReader();

View File

@ -4,6 +4,7 @@ import com.google.gson.Gson;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule;
import plan.CrawlPlanLoader;
@ -19,6 +20,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
public class ConverterMain {
@ -46,13 +48,21 @@ public class ConverterMain {
CrawlPlan plan,
DomainProcessor processor,
InstructionsCompiler compiler,
Gson gson
Gson gson,
ProcessHeartbeat heartbeat
) throws Exception {
heartbeat.start();
logger.info("Starting pipe");
try (WorkLog processLog = plan.createProcessWorkLog();
ConversionLog log = new ConversionLog(plan.process.getDir())) {
instructionWriter = new InstructionWriter(log, plan.process.getDir(), gson);
int totalDomains = plan.countCrawledDomains();
AtomicInteger processedDomains = new AtomicInteger(0);
var pipe = new ParallelPipe<CrawledDomain, ProcessingInstructions>("Converter", 16, 4, 2) {
@Override
@ -78,6 +88,8 @@ public class ConverterMain {
String where = instructionWriter.accept(processedInstructions.id, instructions);
processLog.setJobToFinished(processedInstructions.id, where, instructions.size());
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
}
finally {
Thread.currentThread().setName("Converter:Receiver[IDLE]");
@ -86,6 +98,7 @@ public class ConverterMain {
};
plan.forEachCrawledDomain(id -> !processLog.isJobFinished(id), pipe::accept);
pipe.join();

View File

@ -4,10 +4,13 @@ import com.google.gson.Gson;
import com.google.inject.AbstractModule;
import com.google.inject.name.Names;
import nu.marginalia.LanguageModels;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.WmsaHome;
import plan.CrawlPlan;
import nu.marginalia.model.gson.GsonFactory;
import java.util.UUID;
public class ConverterModule extends AbstractModule {
private final CrawlPlan plan;
@ -21,6 +24,8 @@ public class ConverterModule extends AbstractModule {
bind(Gson.class).toInstance(createGson());
bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("converter", 0, UUID.randomUUID()));
bind(Double.class).annotatedWith(Names.named("min-document-quality")).toInstance(-15.);
bind(Integer.class).annotatedWith(Names.named("min-document-length")).toInstance(250);
bind(Integer.class).annotatedWith(Names.named("max-title-length")).toInstance(128);

View File

@ -21,6 +21,7 @@ tasks.distZip.enabled = false
dependencies {
implementation project(':code:common:process')
implementation project(':code:common:db')
implementation project(':code:common:model')
implementation project(':code:common:config')
implementation project(':code:common:service')

View File

@ -1,10 +1,12 @@
package nu.marginalia.crawl;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.UserAgent;
import nu.marginalia.WmsaHome;
import nu.marginalia.crawl.retreival.fetcher.HttpFetcherImpl;
import nu.marginalia.crawl.retreival.fetcher.SitemapRetriever;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule;
import plan.CrawlPlanLoader;
import plan.CrawlPlan;
import nu.marginalia.crawling.io.CrawledDomainWriter;
@ -20,7 +22,9 @@ import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class CrawlerMain implements AutoCloseable {
private final Logger logger = LoggerFactory.getLogger(getClass());
@ -45,6 +49,8 @@ public class CrawlerMain implements AutoCloseable {
AbortMonitor abortMonitor = AbortMonitor.getInstance();
Semaphore taskSem = new Semaphore(poolSize);
private static ProcessHeartbeat heartbeat;
public CrawlerMain(CrawlPlan plan) throws Exception {
this.plan = plan;
this.userAgent = WmsaHome.getUserAgent();
@ -77,9 +83,16 @@ public class CrawlerMain implements AutoCloseable {
}
var plan = new CrawlPlanLoader().load(Path.of(args[0]));
heartbeat = new ProcessHeartbeat(new ProcessConfiguration("crawler", 0, UUID.randomUUID()),
new DatabaseModule().provideConnection());
try (var crawler = new CrawlerMain(plan)) {
heartbeat.start();
crawler.run();
}
finally {
heartbeat.shutDown();
}
System.exit(0);
}
@ -87,12 +100,18 @@ public class CrawlerMain implements AutoCloseable {
public void run() throws InterruptedException {
// First a validation run to ensure the file is all good to parse
logger.info("Validating JSON");
plan.forEachCrawlingSpecification(unused -> {});
AtomicInteger countTotal = new AtomicInteger();
AtomicInteger countProcessed = new AtomicInteger();
plan.forEachCrawlingSpecification(unused -> countTotal.incrementAndGet());
logger.info("Let's go");
// TODO: Make this into an iterable instead so we can abort it
plan.forEachCrawlingSpecification(this::startCrawlTask);
plan.forEachCrawlingSpecification((spec) -> {
heartbeat.setProgress(countProcessed.incrementAndGet() / (double) countTotal.get());
startCrawlTask(spec);
});
}

View File

@ -5,6 +5,7 @@ import com.google.inject.Inject;
import com.google.inject.Injector;
import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows;
import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.log.WorkLog;
import plan.CrawlPlanLoader;
import plan.CrawlPlan;
@ -32,9 +33,10 @@ public class LoaderMain {
private final LoaderFactory loaderFactory;
private final IndexLoadKeywords indexLoadKeywords;
private final ProcessHeartbeat heartbeat;
private volatile boolean running = true;
final Thread processorThread = new Thread(this::processor, "Processor Thread");
final Thread processorThread;
public static void main(String... args) throws IOException {
if (args.length != 1) {
@ -59,16 +61,23 @@ public class LoaderMain {
public LoaderMain(CrawlPlan plan,
ConvertedDomainReader instructionsReader,
HikariDataSource dataSource,
LoaderFactory loaderFactory, IndexLoadKeywords indexLoadKeywords) {
LoaderFactory loaderFactory,
IndexLoadKeywords indexLoadKeywords,
ProcessHeartbeat heartbeat
) {
this.plan = plan;
this.instructionsReader = instructionsReader;
this.loaderFactory = loaderFactory;
this.indexLoadKeywords = indexLoadKeywords;
this.heartbeat = heartbeat;
heartbeat.start();
nukeTables(dataSource);
Runtime.getRuntime().addShutdownHook(new Thread(this::shutDownIndex));
processorThread = new Thread(this::processor, "Processor Thread");
processorThread.start();
}
@ -97,17 +106,26 @@ public class LoaderMain {
public void run() {
var logFile = plan.process.getLogFile();
AtomicInteger loadTotal = new AtomicInteger();
WorkLog.readLog(logFile, entry -> { loadTotal.incrementAndGet(); });
LoaderMain.loadTotal = loadTotal.get();
try {
AtomicInteger loadTotal = new AtomicInteger();
WorkLog.readLog(logFile, entry -> {
loadTotal.incrementAndGet();
});
LoaderMain.loadTotal = loadTotal.get();
WorkLog.readLog(logFile, entry -> {
load(plan, entry.path(), entry.cnt());
});
AtomicInteger loaded = new AtomicInteger();
WorkLog.readLog(logFile, entry -> {
heartbeat.setProgress(loaded.incrementAndGet() / (double) loadTotal.get());
running = false;
processorThread.join();
load(plan, entry.path(), entry.cnt());
});
running = false;
processorThread.join();
}
finally {
heartbeat.shutDown();
}
System.exit(0);
}

View File

@ -4,6 +4,7 @@ import com.google.gson.Gson;
import com.google.inject.AbstractModule;
import com.google.inject.name.Names;
import nu.marginalia.LanguageModels;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.WmsaHome;
import plan.CrawlPlan;
import nu.marginalia.model.gson.GsonFactory;
@ -11,6 +12,7 @@ import nu.marginalia.service.SearchServiceDescriptors;
import nu.marginalia.service.descriptor.ServiceDescriptors;
import java.nio.file.Path;
import java.util.UUID;
public class LoaderModule extends AbstractModule {
@ -24,6 +26,7 @@ public class LoaderModule extends AbstractModule {
bind(CrawlPlan.class).toInstance(plan);
bind(ServiceDescriptors.class).toInstance(SearchServiceDescriptors.descriptors);
bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("loader", 0, UUID.randomUUID()));
bind(Gson.class).toInstance(createGson());

View File

@ -29,6 +29,7 @@ public class ControlService extends Service {
private final ServiceMonitors monitors;
private final MustacheRenderer<Object> indexRenderer;
private final MustacheRenderer<Map<?,?>> servicesRenderer;
private final MustacheRenderer<Map<?,?>> processesRenderer;
private final MustacheRenderer<Map<?,?>> eventsRenderer;
private final MustacheRenderer<Map<?,?>> messageQueueRenderer;
private final MqPersistence messageQueuePersistence;
@ -54,6 +55,7 @@ public class ControlService extends Service {
indexRenderer = rendererFactory.renderer("control/index");
servicesRenderer = rendererFactory.renderer("control/services");
processesRenderer = rendererFactory.renderer("control/processes");
eventsRenderer = rendererFactory.renderer("control/events");
messageQueueRenderer = rendererFactory.renderer("control/message-queue");
@ -62,12 +64,13 @@ public class ControlService extends Service {
Spark.get("/public/heartbeats", (req, res) -> {
res.type("application/json");
return heartbeatService.getHeartbeats();
return heartbeatService.getServiceHeartbeats();
}, gson::toJson);
Spark.get("/public/", (req, rsp) -> indexRenderer.render(Map.of()));
Spark.get("/public/services", (req, rsp) -> servicesRenderer.render(Map.of("heartbeats", heartbeatService.getHeartbeats())));
Spark.get("/public/services", (req, rsp) -> servicesRenderer.render(Map.of("heartbeats", heartbeatService.getServiceHeartbeats())));
Spark.get("/public/processes", (req, rsp) -> processesRenderer.render(Map.of("heartbeats", heartbeatService.getProcessHeartbeats())));
Spark.get("/public/events", (req, rsp) -> eventsRenderer.render(Map.of("events", eventLogService.getLastEntries(20))));
Spark.get("/public/message-queue", (req, rsp) -> messageQueueRenderer.render(Map.of("messages", messageQueueViewService.getLastEntries(20))));

View File

@ -23,7 +23,7 @@ public class EventLogService {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT SERVICE_NAME, INSTANCE, EVENT_TIME, EVENT_TYPE, EVENT_MESSAGE
FROM PROC_SERVICE_EVENTLOG ORDER BY ID DESC LIMIT ?
FROM SERVICE_EVENTLOG ORDER BY ID DESC LIMIT ?
""")) {
query.setInt(1, n);

View File

@ -3,6 +3,7 @@ package nu.marginalia.control;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.control.model.ProcessHeartbeat;
import nu.marginalia.control.model.ServiceHeartbeat;
import java.sql.SQLException;
@ -18,14 +19,14 @@ public class HeartbeatService {
this.dataSource = dataSource;
}
public List<ServiceHeartbeat> getHeartbeats() {
public List<ServiceHeartbeat> getServiceHeartbeats() {
List<ServiceHeartbeat> heartbeats = new ArrayList<>();
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT SERVICE_NAME, SERVICE_BASE, INSTANCE, ALIVE,
TIMESTAMPDIFF(MICROSECOND, HEARTBEAT_TIME, CURRENT_TIMESTAMP(6)) AS TSDIFF
FROM PROC_SERVICE_HEARTBEAT
FROM SERVICE_HEARTBEAT
""")) {
var rs = stmt.executeQuery();
@ -46,6 +47,34 @@ public class HeartbeatService {
return heartbeats;
}
public List<ProcessHeartbeat> getProcessHeartbeats() {
List<ProcessHeartbeat> heartbeats = new ArrayList<>();
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
SELECT PROCESS_NAME, PROCESS_BASE, INSTANCE, STATUS, PROGRESS,
TIMESTAMPDIFF(MICROSECOND, HEARTBEAT_TIME, CURRENT_TIMESTAMP(6)) AS TSDIFF
FROM PROCESS_HEARTBEAT
""")) {
var rs = stmt.executeQuery();
while (rs.next()) {
heartbeats.add(new ProcessHeartbeat(
rs.getString("PROCESS_NAME"),
rs.getString("PROCESS_BASE"),
trimUUID(rs.getString("INSTANCE")),
rs.getInt("TSDIFF") / 1000.,
rs.getInt("PROGRESS"),
rs.getString("STATUS")
));
}
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
return heartbeats;
}
private String trimUUID(String uuid) {
if (uuid.length() > 8) {
return uuid.substring(0, 8);

View File

@ -23,7 +23,7 @@ public class MessageQueueViewService {
try (var conn = dataSource.getConnection();
var query = conn.prepareStatement("""
SELECT ID, RELATED_ID, SENDER_INBOX, RECIPIENT_INBOX, FUNCTION, OWNER_INSTANCE, OWNER_TICK, STATE, CREATED_TIME, UPDATED_TIME, TTL
FROM PROC_MESSAGE
FROM MESSAGE_QUEUE
ORDER BY ID DESC
LIMIT ?
""")) {

View File

@ -0,0 +1,25 @@
package nu.marginalia.control.model;
public record ProcessHeartbeat(
String processId,
String processBase,
String uuid,
double lastSeenMillis,
int progress,
String status
) {
public boolean isMissing() {
return lastSeenMillis > 10000;
}
public boolean isStopped() {
return "STOPPED".equals(status);
}
public String progressStyle() {
if ("RUNNING".equals(status) && progress > 0) {
return """
background: linear-gradient(90deg, #ccc %d%%, #ccc %d%%, #fff %d%%)
""".formatted(progress, progress, progress);
}
return "";
}
}

View File

@ -1,5 +1,5 @@
body {
font-family: serif;
font-family: sans-serif;
line-height: 1.6;
display: grid;
@ -8,6 +8,17 @@ body {
grid-template-areas:
"left right";
}
h1 {
font-family: serif;
}
table {
font-family: monospace;
}
th { text-align: left; }
td,th { padding-right: 1ch; border: 1px solid #ccc; }
tr:nth-last-of-type(2n) {
background-color: #eee;
}
body > nav {
grid-area: left;
}

View File

@ -4,6 +4,7 @@
<title>Control Service</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="style.css" />
<meta http-equiv="refresh" content="5">
</head>
<body>
{{> control/partials/nav}}

View File

@ -4,6 +4,7 @@
<title>Control Service</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="style.css" />
<meta http-equiv="refresh" content="5">
</head>
<body>
{{> control/partials/nav}}

View File

@ -4,6 +4,7 @@
<title>Control Service</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="style.css" />
<meta http-equiv="refresh" content="5">
</head>
<body>
{{> control/partials/nav}}

View File

@ -2,8 +2,8 @@
<ul>
<li><a href="/">Overview</a></li>
<li><a href="services">Services</a></li>
<li><a href="processes">Processes</a></li>
<li><a href="events">Events</a></li>
<li><a href="message-queue">Message Queue</a></li>
<li><a href="processes">Processes</a></li>
</ul>
</nav>

View File

@ -0,0 +1,34 @@
<!DOCTYPE html>
<html>
<head>
<title>Control Service</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="style.css" />
<meta http-equiv="refresh" content="5">
</head>
<body>
{{> control/partials/nav}}
<section>
<h1>Processes</h1>
<table id="heartbeats">
<tr>
<th>Process ID</th>
<th>UUID</th>
<th>Status</th>
<th>Progress</th>
<th>Last Seen (ms)</th>
</tr>
{{#each heartbeats}}
<tr class="{{#if isMissing}}missing{{/if}}" style="{{progressStyle}}">
<td>{{processId}}</td>
<td>{{uuid}}</td>
<td>{{status}}</td>
<td>{{progress}}</td>
<td>{{#unless isStopped}}{{lastSeenMillis}}{{/unless}}</td>
</tr>
{{/each}}
</table>
</section>
</body>
</html>

View File

@ -4,6 +4,7 @@
<title>Control Service</title>
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<link rel="stylesheet" href="style.css" />
<meta http-equiv="refresh" content="5">
</head>
<body>
{{> control/partials/nav}}