(adjacencies) Clean up AdjacenciesLoader

Make JDBC batching more consistent, also adds a test case for the loader.
This commit is contained in:
Viktor Lofgren 2023-12-21 14:14:22 +01:00
parent b6253b03c2
commit dc773c5c20
4 changed files with 112 additions and 11 deletions

View File

@ -34,4 +34,8 @@ dependencies {
testImplementation libs.bundles.slf4j.test testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit testImplementation libs.bundles.junit
testImplementation libs.mockito testImplementation libs.mockito
testImplementation platform('org.testcontainers:testcontainers-bom:1.17.4')
testImplementation 'org.testcontainers:mariadb:1.17.4'
testImplementation 'org.testcontainers:junit-jupiter:1.17.4'
} }

View File

@ -13,17 +13,16 @@ public class AdjacenciesLoader {
private static final Logger logger = LoggerFactory.getLogger(AdjacenciesLoader.class); private static final Logger logger = LoggerFactory.getLogger(AdjacenciesLoader.class);
final HikariDataSource dataSource; private final HikariDataSource dataSource;
final LinkedBlockingDeque<WebsiteAdjacenciesCalculator.DomainSimilarities> similaritiesLinkedBlockingDeque = new LinkedBlockingDeque<>(100); private final LinkedBlockingDeque<WebsiteAdjacenciesCalculator.DomainSimilarities> similaritiesLinkedBlockingDeque = new LinkedBlockingDeque<>(100);
final Thread loaderThread; private final Thread loaderThread;
volatile boolean running = true; volatile boolean running = true;
public AdjacenciesLoader(HikariDataSource dataSource) { public AdjacenciesLoader(HikariDataSource dataSource) {
this.dataSource = dataSource; this.dataSource = dataSource;
loaderThread = new Thread(this::insertThreadRun, "Adjacencies Loader Thread"); loaderThread = Thread.ofPlatform().name("Adjacencies Loader Thread").start(this::insertThreadRun);
loaderThread.start();
} }
@SneakyThrows @SneakyThrows
@ -47,18 +46,30 @@ public class AdjacenciesLoader {
INSERT INTO EC_DOMAIN_NEIGHBORS_TMP (DOMAIN_ID, NEIGHBOR_ID, RELATEDNESS) VALUES (?, ?, ?) INSERT INTO EC_DOMAIN_NEIGHBORS_TMP (DOMAIN_ID, NEIGHBOR_ID, RELATEDNESS) VALUES (?, ?, ?)
""")) """))
{ {
while (running || !similaritiesLinkedBlockingDeque.isEmpty()) { int itemCount = 0;
var item = similaritiesLinkedBlockingDeque.pollFirst(1, TimeUnit.SECONDS);
if (item == null) continue;
for (; item != null; item = similaritiesLinkedBlockingDeque.pollFirst()) { while (running || !similaritiesLinkedBlockingDeque.isEmpty()) {
for (var item = similaritiesLinkedBlockingDeque.pollFirst(1, TimeUnit.SECONDS);
item != null;
item = similaritiesLinkedBlockingDeque.pollFirst())
{
for (var similarity : item.similarities()) { for (var similarity : item.similarities()) {
stmt.setInt(1, item.domainId()); stmt.setInt(1, item.domainId());
stmt.setInt(2, similarity.domainId()); stmt.setInt(2, similarity.domainId());
stmt.setDouble(3, similarity.value()); stmt.setDouble(3, similarity.value());
stmt.addBatch(); stmt.addBatch();
itemCount++;
}
if (itemCount++ > 1000) {
stmt.executeBatch();
itemCount = 0;
} }
} }
}
// Flush remaining items
if (itemCount > 0) {
stmt.executeBatch(); stmt.executeBatch();
} }
} }

View File

@ -134,8 +134,8 @@ public class WebsiteAdjacenciesCalculator {
return weightedProduct(weights, a, b) / Math.sqrt(a.mulAndSum(weights) * b.mulAndSum(weights)); return weightedProduct(weights, a, b) / Math.sqrt(a.mulAndSum(weights) * b.mulAndSum(weights));
} }
record DomainSimilarities(int domainId, List<DomainSimilarity> similarities) {}; public record DomainSimilarities(int domainId, List<DomainSimilarity> similarities) {};
record DomainSimilarity(int domainId, double value) {}; public record DomainSimilarity(int domainId, double value) {};
@SneakyThrows @SneakyThrows
private void findAdjacentDtoS(int domainId, Consumer<DomainSimilarities> andThen) { private void findAdjacentDtoS(int domainId, Consumer<DomainSimilarities> andThen) {

View File

@ -0,0 +1,86 @@
package nu.marginalia.adjacencies;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.parallel.Execution;
import org.testcontainers.containers.MariaDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import java.util.List;
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
@Testcontainers
@Execution(SAME_THREAD)
@Tag("slow")
public class AdjacenciesLoaderTest {
private static HikariDataSource dataSource;
@Container
static MariaDBContainer<?> mariaDBContainer = new MariaDBContainer<>("mariadb")
.withDatabaseName("WMSA_prod")
.withUsername("wmsa")
.withPassword("wmsa")
.withNetworkAliases("mariadb");
@BeforeAll
public static void setup() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl(mariaDBContainer.getJdbcUrl());
config.setUsername("wmsa");
config.setPassword("wmsa");
dataSource = new HikariDataSource(config);
try (var conn = dataSource.getConnection();
var stmt = conn.createStatement()) {
stmt.execute("""
CREATE TABLE EC_DOMAIN_NEIGHBORS_2 (
DOMAIN_ID INT NOT NULL,
NEIGHBOR_ID INT NOT NULL,
RELATEDNESS DOUBLE NOT NULL,
PRIMARY KEY (DOMAIN_ID, NEIGHBOR_ID)
)
""");
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@AfterAll
public static void teardown() {
dataSource.close();
mariaDBContainer.close();
}
@Test
public void testLoad() {
var loader = new AdjacenciesLoader(dataSource);
try {
loader.load(new WebsiteAdjacenciesCalculator.DomainSimilarities(1,
List.of(new WebsiteAdjacenciesCalculator.DomainSimilarity(2, 0.5),
new WebsiteAdjacenciesCalculator.DomainSimilarity(3, 0.6)
)));
loader.stop();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("SELECT * FROM EC_DOMAIN_NEIGHBORS_2 WHERE DOMAIN_ID=1");
) {
var rs = stmt.executeQuery();
Assertions.assertTrue(rs.next());
Assertions.assertEquals(2, rs.getInt(2));
Assertions.assertEquals(0.5, rs.getDouble(3));
Assertions.assertTrue(rs.next());
Assertions.assertEquals(3, rs.getInt(2));
Assertions.assertEquals(0.6, rs.getDouble(3));
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}