Merge pull request #79 from MarginaliaSearch/reddit

(converter) Loader for reddit data

Adds experimental sideloading support for pusshift.io style reddit data. This dataset is limited to data older than 2023, due to licensing changes making large-scale data extraction difficult.

Since the median post quality on reddit is not very good, he sideloader will only load a subset of self-texts and top-level comments that have sufficiently many upvotes. Empirically this appears to mostly return good matches, even if it probably could index more.

Tests were written for this, but all require local reddit data which can't be distributed with the source code. If these can not be found, the tests will shortcircuit as OK. They're mostly there for debugging, and it's fine if they don't always run.

The change also refactors the sideloading a bit since it was a bit messy, and improves the sideload UX a tiny bit.
This commit is contained in:
Viktor 2024-02-15 09:17:56 +01:00 committed by GitHub
commit d970836605
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
36 changed files with 1119 additions and 100 deletions

View File

@ -142,7 +142,13 @@ public class ExecutorClient extends AbstractDynamicClient {
.build()
);
}
public void sideloadReddit(int node, Path sourcePath) {
stubPool.apiForNode(node).sideloadReddit(
RpcSideloadReddit.newBuilder()
.setSourcePath(sourcePath.toString())
.build()
);
}
public void sideloadWarc(int node, Path sourcePath) {
stubPool.apiForNode(node).sideloadWarc(
RpcSideloadWarc.newBuilder()

View File

@ -20,6 +20,7 @@ service ExecutorApi {
rpc sideloadEncyclopedia(RpcSideloadEncyclopedia) returns (Empty) {}
rpc sideloadDirtree(RpcSideloadDirtree) returns (Empty) {}
rpc sideloadWarc(RpcSideloadWarc) returns (Empty) {}
rpc sideloadReddit(RpcSideloadReddit) returns (Empty) {}
rpc sideloadStackexchange(RpcSideloadStackexchange) returns (Empty) {}
rpc createCrawlSpecFromDownload(RpcCrawlSpecFromDownload) returns (Empty) {}
@ -58,6 +59,9 @@ message RpcSideloadDirtree {
message RpcSideloadWarc {
string sourcePath = 1;
}
message RpcSideloadReddit {
string sourcePath = 1;
}
message RpcSideloadStackexchange {
string sourcePath = 1;
}

View File

@ -5,5 +5,6 @@ public enum ConvertAction {
SideloadEncyclopedia,
SideloadDirtree,
SideloadWarc,
SideloadReddit,
SideloadStackexchange
}

View File

@ -1,6 +1,7 @@
package nu.marginalia.mqapi.converting;
import lombok.AllArgsConstructor;
import lombok.NonNull;
import nu.marginalia.storage.model.FileStorageId;
import java.nio.file.Path;
@ -13,6 +14,13 @@ public class ConvertRequest {
public final FileStorageId processedDataStorage;
public final String baseUrl;
public Path getInputPath() {
if (inputSource == null)
return null;
return Path.of(inputSource);
}
public static ConvertRequest forCrawlData(FileStorageId sourceId, FileStorageId destId) {
return new ConvertRequest(
ConvertAction.ConvertCrawlData,
@ -45,7 +53,13 @@ public class ConvertRequest {
destId,
null);
}
public static ConvertRequest forReddit(Path sourcePath, FileStorageId destId) {
return new ConvertRequest(ConvertAction.SideloadReddit,
sourcePath.toString(),
null,
destId,
null);
}
public static ConvertRequest forStackexchange(Path sourcePath, FileStorageId destId) {
return new ConvertRequest(ConvertAction.SideloadStackexchange,
sourcePath.toString(),

View File

@ -0,0 +1,39 @@
plugins {
id 'java'
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(21))
}
}
dependencies {
implementation libs.bundles.slf4j
implementation project(':code:libraries:blocking-thread-pool')
implementation project(':code:common:model')
implementation libs.notnull
implementation libs.jsoup
implementation libs.sqlite
implementation libs.guice
implementation libs.guava
implementation libs.gson
implementation libs.zstd
implementation libs.trove
implementation libs.commons.compress
implementation libs.xz
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
test {
maxHeapSize = "8G"
useJUnitPlatform()
}

View File

@ -0,0 +1,60 @@
package nu.marginalia.integration.reddit;
import com.github.luben.zstd.ZstdInputStream;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.stream.JsonReader;
import nu.marginalia.integration.reddit.model.RawRedditComment;
import nu.marginalia.integration.reddit.model.RawRedditSubmission;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;
public class RedditEntryReader {
public static EntryIterator<RawRedditSubmission> readSubmissions(Path file) throws IOException {
return new EntryIterator<>(file, RawRedditSubmission.class);
}
public static EntryIterator<RawRedditComment> readComments(Path file) throws IOException {
return new EntryIterator<>(file, RawRedditComment.class);
}
public static class EntryIterator<T> implements Iterator<T>, AutoCloseable {
private final JsonReader reader;
private final Class<T> type;
private final Gson gson = new GsonBuilder().create();
private EntryIterator(Path file, Class<T> type) throws IOException {
this.type = type;
reader = new JsonReader(new InputStreamReader(new ZstdInputStream(Files.newInputStream(file, StandardOpenOption.READ))));
// Set the reader to be lenient to allow multiple top level objects
reader.setLenient(true);
}
@Override
public boolean hasNext() {
try {
return reader.hasNext();
} catch (IOException e) {
e.printStackTrace();
return false;
}
}
@Override
public T next() {
return gson.fromJson(reader, type);
}
@Override
public void close() throws IOException {
reader.close();
}
}
}

View File

@ -0,0 +1,203 @@
package nu.marginalia.integration.reddit.db;
import com.google.common.base.Strings;
import lombok.SneakyThrows;
import nu.marginalia.integration.reddit.RedditEntryReader;
import nu.marginalia.integration.reddit.model.ProcessableRedditComment;
import nu.marginalia.integration.reddit.model.ProcessableRedditSubmission;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.*;
import java.util.Iterator;
/** This class is responsible for creating and accessing a sqlite database with reddit data, for
* easier aggregation and subsequent processing. */
public class RedditDb {
public static void create(Path submissionsFile, Path commentsFile, Path dbFile) throws IOException, SQLException
{
Files.deleteIfExists(dbFile);
try (var connection = DriverManager.getConnection(STR."jdbc:sqlite:\{dbFile}");
var stream = ClassLoader.getSystemResourceAsStream("db/reddit.sql");
var updateStmt = connection.createStatement()
) {
var sql = new String(stream.readAllBytes());
String[] sqlParts = sql.split(";");
for (var part : sqlParts) {
if (part.isBlank()) {
continue;
}
updateStmt.executeUpdate(part);
}
updateStmt.execute("PRAGMA synchronous = OFF");
try (var iter = RedditEntryReader.readSubmissions(submissionsFile);
var stmt = connection.prepareStatement("""
INSERT OR IGNORE INTO submission(id, author, created_utc, score, title, selftext, subreddit, permalink)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""")
) {
while (iter.hasNext()) {
var submission = iter.next();
if (Strings.isNullOrEmpty(submission.name))
continue;
stmt.setString(1, submission.name);
stmt.setString(2, submission.author);
stmt.setInt(3, submission.created_utc);
stmt.setInt(4, submission.score);
stmt.setString(5, submission.title);
if (submission.score > 10 && submission.selftext.length() > 1000) {
stmt.setString(6, submission.selftext);
} else {
stmt.setString(6, "");
}
stmt.setString(7, submission.subreddit);
stmt.setString(8, submission.permalink);
stmt.executeUpdate();
}
}
try (var iter = RedditEntryReader.readComments(commentsFile);
var stmt = connection.prepareStatement("""
INSERT OR IGNORE INTO comment(id, author, score, body, threadId)
VALUES (?, ?, ?, ?, ?)
""")
)
{
while (iter.hasNext()) {
var comment = iter.next();
if (comment.body.length() < 1000) continue;
if (comment.score < 10) continue;
// We only want to store top-level comments
if (!comment.parent_id.startsWith("t3")) continue;
stmt.setString(1, comment.id);
stmt.setString(2, comment.author);
stmt.setInt(3, comment.score);
stmt.setString(4, comment.body);
stmt.setString(5, comment.parent_id);
stmt.executeUpdate();
}
}
}
}
public static SubmissionIterator getSubmissions(Path file) throws SQLException {
var connection = DriverManager.getConnection(STR."jdbc:sqlite:\{file}");
return new SubmissionIterator(connection);
}
public static CommentIterator getComments(Path file) throws SQLException {
var connection = DriverManager.getConnection(STR."jdbc:sqlite:\{file}");
return new CommentIterator(connection);
}
public static class SubmissionIterator extends SqlQueryIterator<ProcessableRedditSubmission> {
SubmissionIterator(Connection connection) throws SQLException {
super(connection, """
SELECT id, author, created_utc, score, title, selftext, subreddit, permalink
FROM submission
WHERE length(selftext) > 0
""");
}
@Override
ProcessableRedditSubmission nextFromResultSet(ResultSet resultSet) throws SQLException {
return new ProcessableRedditSubmission(resultSet.getString("subreddit"),
resultSet.getString("id"),
resultSet.getString("author"),
resultSet.getString("title"),
resultSet.getString("selftext"),
resultSet.getInt("created_utc"),
resultSet.getString("permalink"),
resultSet.getInt("score")
);
}
}
public static class CommentIterator extends SqlQueryIterator<ProcessableRedditComment> {
CommentIterator(Connection connection) throws SQLException {
super(connection, """
select submission.subreddit,
comment.id,
comment.author,
submission.title,
body,
created_utc,
permalink,
comment.score
from comment
inner join submission on threadId=submission.id
""");
}
@Override
ProcessableRedditComment nextFromResultSet(ResultSet resultSet) throws SQLException {
return new ProcessableRedditComment(
resultSet.getString("subreddit"),
resultSet.getString("id"),
resultSet.getString("author"),
resultSet.getString("title"),
resultSet.getString("body"),
resultSet.getInt("created_utc"),
resultSet.getString("permalink") + resultSet.getString("id"),
resultSet.getInt("score")
);
}
}
static abstract class SqlQueryIterator<T> implements Iterator<T>, AutoCloseable {
private final PreparedStatement stmt;
private final ResultSet resultSet;
private Boolean hasNext = null;
SqlQueryIterator(Connection connection, String query) throws SQLException {
// This is sql-injection safe since the query is not user input:
stmt = connection.prepareStatement(query);
resultSet = stmt.executeQuery();
}
@Override
public void close() throws Exception {
resultSet.close();
stmt.close();
}
@SneakyThrows
@Override
public boolean hasNext() {
if (hasNext != null)
return hasNext;
hasNext = resultSet.next();
return hasNext;
}
abstract T nextFromResultSet(ResultSet resultSet) throws SQLException;
@SneakyThrows
@Override
public T next() {
if (!hasNext())
throw new IllegalStateException();
else hasNext = null;
return nextFromResultSet(resultSet);
}
}
}

View File

@ -0,0 +1,19 @@
package nu.marginalia.integration.reddit.model;
import lombok.AllArgsConstructor;
import lombok.ToString;
/** A projection of a Reddit comment joined with its top level submission
* that is ready for processing. */
@AllArgsConstructor
@ToString
public class ProcessableRedditComment {
public String subreddit;
public String name;
public String author;
public String title;
public String body;
public int created_utc;
public String permalink;
public int score;
}

View File

@ -0,0 +1,17 @@
package nu.marginalia.integration.reddit.model;
import lombok.AllArgsConstructor;
import lombok.ToString;
/** A projection of a Reddit top level submission that is appropriate for processing. */
@AllArgsConstructor @ToString
public class ProcessableRedditSubmission {
public String subreddit;
public String name;
public String author;
public String title;
public String selftext;
public int created_utc;
public String permalink;
public int score;
}

View File

@ -0,0 +1,20 @@
package nu.marginalia.integration.reddit.model;
import lombok.AllArgsConstructor;
import lombok.ToString;
import lombok.With;
/** Corresponds directly to the pushshift.io Reddit comment JSON format. */
@AllArgsConstructor
@ToString
@With
public class RawRedditComment {
public String parent_id;
public String link_id;
public String id;
public String author;
public String body;
public String subreddit;
public int score;
}

View File

@ -0,0 +1,22 @@
package nu.marginalia.integration.reddit.model;
import lombok.AllArgsConstructor;
import lombok.ToString;
import lombok.With;
/** Corresponds directly to the pushshift.io Reddit submission JSON format. */
@AllArgsConstructor
@With
@ToString
public class RawRedditSubmission {
public int score;
public String subreddit;
public String name;
public String author;
public String title;
public String selftext;
public int num_comments;
public int created_utc;
public String permalink;
}

View File

@ -0,0 +1,25 @@
CREATE TABLE submission (
id TEXT PRIMARY KEY,
author TEXT,
subreddit TEXT,
title TEXT,
selftext TEXT,
score INTEGER,
created_utc INTEGER,
num_comments INTEGER,
permalink TEXT
);
CREATE TABLE comment (
id TEXT PRIMARY KEY,
threadId TEXT,
author TEXT,
subreddit TEXT,
title TEXT,
body TEXT,
score INTEGER,
num_comments INTEGER
);
CREATE INDEX submission_id ON submission(id);
CREATE INDEX comment_id ON comment(id);

View File

@ -0,0 +1,54 @@
package nu.marginalia.integration.reddit;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
class RedditEntryReaderTest {
/** This test case exists for debugging, reddit sideloading. It requires local reddit data,
* and is not part of the normal test suite. Update the path to a directory with reddit data
* in the dbPath variable.
* */
Path dbPath = Path.of("/home/vlofgren/Code/RemoteEnv/local/index-1/uploads/reddit/");
@Test
void readSubmissions() throws IOException {
if (!Files.exists(dbPath))
return;
try (var iter = RedditEntryReader.readSubmissions(dbPath.resolve("weightroom_submissions.zst"))) {
for (int i = 0; iter.hasNext() && i<50; ) {
var entry = iter.next();
if (entry.selftext.length() > 1000 && entry.score > 10) {
System.out.println(entry);
i++;
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Test
void readComments() throws IOException {
if (!Files.exists(dbPath))
return;
try (var iter = RedditEntryReader.readComments(dbPath.resolve("weightroom_comments.zst"))) {
for (int i = 0; iter.hasNext() && i<50; ) {
var entry = iter.next();
if (entry.body.length() > 1000 && entry.score > 10 && entry.parent_id.startsWith("t1")) {
System.out.println(entry);
i++;
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,56 @@
package nu.marginalia.integration.reddit.db;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import static org.junit.jupiter.api.Assertions.*;
class RedditDbTest {
@Disabled
@Test
void create() throws SQLException, IOException {
RedditDb.create(
Path.of("/home/vlofgren/Exports/reddit/weightroom_submissions.zst"),
Path.of("/home/vlofgren/Exports/reddit/weightroom_comments.zst"),
Path.of("/tmp/reddit.db")
);
}
@Disabled
@Test
void readSubmissions() {
try (var iter = RedditDb.getSubmissions(Path.of("/home/vlofgren/Code/RemoteEnv/local/index-1/uploads/reddit/weightroom.8eda94e.db"))) {
for (int i = 0; i < 10; i++) {
System.out.println(iter.next());
}
} catch (SQLException e) {
throw new RuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
@Disabled
@Test
void readComments() {
try (var iter = RedditDb.getComments(Path.of("/home/vlofgren/Code/RemoteEnv/local/index-1/uploads/reddit/weightroom.8eda94e.db"))) {
for (int i = 0; i < 10; i++) {
var entry = iter.next();
System.out.println(iter.next());
System.out.println(LocalDate.ofInstant(Instant.ofEpochSecond(entry.created_utc), ZoneOffset.UTC).getYear());
}
} catch (SQLException e) {
throw new RuntimeException(e);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}

View File

@ -1,23 +1,15 @@
package nu.marginalia.model.processed;
import blue.strategic.parquet.Dehydrator;
import blue.strategic.parquet.Hydrator;
import blue.strategic.parquet.ValueWriter;
import gnu.trove.list.TLongList;
import gnu.trove.list.array.TLongArrayList;
import lombok.*;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
@Getter
@Setter
@NoArgsConstructor

View File

@ -53,6 +53,7 @@ dependencies {
implementation project(':code:features-convert:keyword-extraction')
implementation project(':code:features-convert:summary-extraction')
implementation project(':code:features-convert:stackexchange-xml')
implementation project(':code:features-convert:reddit-json')
implementation project(':code:features-crawl:crawl-blocklist')
implementation project(':code:features-crawl:link-parser')

View File

@ -28,6 +28,7 @@ import nu.marginalia.converting.processor.DomainProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.Collection;
@ -247,6 +248,9 @@ public class ConverterMain extends ProcessMainClass {
try {
var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class);
// will be null on ConvertCrawlData
final Path inputPath = request.getInputPath();
return switch (request.action) {
case ConvertCrawlData -> {
var crawlData = fileStorageService.getStorage(request.crawlStorage);
@ -261,7 +265,8 @@ public class ConverterMain extends ProcessMainClass {
case SideloadEncyclopedia -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(Path.of(request.inputSource), request.baseUrl),
yield new SideloadAction(
sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(inputPath, request.baseUrl),
processData.asPath(),
msg, inbox);
}
@ -269,7 +274,7 @@ public class ConverterMain extends ProcessMainClass {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadDirtree(Path.of(request.inputSource)),
sideloadSourceFactory.sideloadDirtree(inputPath),
processData.asPath(),
msg, inbox);
}
@ -277,14 +282,23 @@ public class ConverterMain extends ProcessMainClass {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadWarc(Path.of(request.inputSource)),
sideloadSourceFactory.sideloadWarc(inputPath),
processData.asPath(),
msg, inbox);
}
case SideloadReddit -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(
sideloadSourceFactory.sideloadReddit(inputPath),
processData.asPath(),
msg, inbox);
}
case SideloadStackexchange -> {
var processData = fileStorageService.getStorage(request.processedDataStorage);
yield new SideloadAction(sideloadSourceFactory.sideloadStackexchange(Path.of(request.inputSource)),
yield new SideloadAction(
sideloadSourceFactory.sideloadStackexchange(inputPath),
processData.asPath(),
msg, inbox);
}

View File

@ -6,8 +6,9 @@ import nu.marginalia.atags.AnchorTextKeywords;
import nu.marginalia.atags.source.AnchorTagsSourceFactory;
import nu.marginalia.converting.sideload.dirtree.DirtreeSideloaderFactory;
import nu.marginalia.converting.sideload.encyclopedia.EncyclopediaMarginaliaNuSideloader;
import nu.marginalia.converting.sideload.reddit.RedditSideloader;
import nu.marginalia.converting.sideload.stackexchange.StackexchangeSideloader;
import nu.marginalia.converting.sideload.warc.WarcSideloadFactory;
import nu.marginalia.converting.sideload.warc.WarcSideloader;
import nu.marginalia.keyword.DocumentKeywordExtractor;
import nu.marginalia.language.sentence.ThreadLocalSentenceExtractorProvider;
@ -15,8 +16,10 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Predicate;
public class SideloadSourceFactory {
private final Gson gson;
@ -26,16 +29,15 @@ public class SideloadSourceFactory {
private final AnchorTextKeywords anchorTextKeywords;
private final AnchorTagsSourceFactory anchorTagsSourceFactory;
private final DirtreeSideloaderFactory dirtreeSideloaderFactory;
private final WarcSideloadFactory warcSideloadFactory;
@Inject
public SideloadSourceFactory(Gson gson,
SideloaderProcessing sideloaderProcessing,
ThreadLocalSentenceExtractorProvider sentenceExtractorProvider,
DocumentKeywordExtractor documentKeywordExtractor, AnchorTextKeywords anchorTextKeywords,
DocumentKeywordExtractor documentKeywordExtractor,
AnchorTextKeywords anchorTextKeywords,
AnchorTagsSourceFactory anchorTagsSourceFactory,
DirtreeSideloaderFactory dirtreeSideloaderFactory,
WarcSideloadFactory warcSideloadFactory) {
DirtreeSideloaderFactory dirtreeSideloaderFactory) {
this.gson = gson;
this.sideloaderProcessing = sideloaderProcessing;
this.sentenceExtractorProvider = sentenceExtractorProvider;
@ -43,7 +45,6 @@ public class SideloadSourceFactory {
this.anchorTextKeywords = anchorTextKeywords;
this.anchorTagsSourceFactory = anchorTagsSourceFactory;
this.dirtreeSideloaderFactory = dirtreeSideloaderFactory;
this.warcSideloadFactory = warcSideloadFactory;
}
public SideloadSource sideloadEncyclopediaMarginaliaNu(Path pathToDbFile, String baseUrl) throws SQLException {
@ -55,24 +56,98 @@ public class SideloadSourceFactory {
}
public Collection<? extends SideloadSource> sideloadWarc(Path pathToWarcFiles) throws IOException {
return warcSideloadFactory.createSideloaders(pathToWarcFiles);
return sideload(pathToWarcFiles,
new PathSuffixPredicate(".warc", ".warc.gz"),
(Path file) -> new WarcSideloader(file, sideloaderProcessing)
);
}
public SideloadSource sideloadReddit(Path pathToDbFiles) throws IOException {
return sideload(pathToDbFiles,
new PathSuffixPredicate(".db"),
(List<Path> paths) -> new RedditSideloader(paths, sentenceExtractorProvider, documentKeywordExtractor));
}
public Collection<? extends SideloadSource> sideloadStackexchange(Path pathToDbFileRoot) throws IOException {
if (Files.isRegularFile(pathToDbFileRoot)) {
return List.of(new StackexchangeSideloader(pathToDbFileRoot, sentenceExtractorProvider, documentKeywordExtractor));
return sideload(pathToDbFileRoot,
new PathSuffixPredicate(".db"),
(Path dbFile) -> new StackexchangeSideloader(dbFile, sentenceExtractorProvider, documentKeywordExtractor)
);
}
interface SideloadConstructorMany {
SideloadSource construct(List<Path> paths) throws IOException;
}
interface SideloadConstructorSingle {
SideloadSource construct(Path paths) throws IOException;
}
Collection<? extends SideloadSource> sideload(
Path path,
Predicate<Path> pathPredicate,
SideloadConstructorSingle constructor
) throws IOException {
if (Files.isRegularFile(path)) {
return List.of(constructor.construct(path));
}
else if (Files.isDirectory(pathToDbFileRoot)) {
try (var dirs = Files.walk(pathToDbFileRoot)) {
return dirs
else if (Files.isDirectory(path)) {
try (var dirs = Files.walk(path)) {
List<SideloadSource> sideloadSources = new ArrayList<>();
dirs
.filter(Files::isRegularFile)
.filter(f -> f.toFile().getName().endsWith(".db"))
.map(dbFile -> new StackexchangeSideloader(dbFile, sentenceExtractorProvider, documentKeywordExtractor))
.toList();
.filter(pathPredicate)
.forEach(file -> {
try {
sideloadSources.add(constructor.construct(file));
} catch (IOException e) {
throw new RuntimeException(e);
}
});
return sideloadSources;
}
}
else { // unix socket, etc
throw new IllegalArgumentException("Path to stackexchange db file(s) must be a file or directory");
}
}
SideloadSource sideload(
Path path,
Predicate<Path> pathPredicate,
SideloadConstructorMany constructor
) throws IOException {
if (Files.isRegularFile(path)) {
return constructor.construct(List.of(path));
}
else if (Files.isDirectory(path)) {
try (var dirs = Files.walk(path)) {
var paths = dirs
.filter(Files::isRegularFile)
.filter(pathPredicate)
.toList();
return constructor.construct(paths);
}
}
else { // unix socket, etc
throw new IllegalArgumentException("Path to stackexchange db file(s) must be a file or directory");
}
}
private static class PathSuffixPredicate implements Predicate<Path> {
private final List<String> endings;
public PathSuffixPredicate(String... endings) {
this.endings = List.of(endings);
}
@Override
public boolean test(Path path) {
String fileName = path.toFile().getName();
return endings.stream().anyMatch(fileName::endsWith);
}
}
}

View File

@ -0,0 +1,195 @@
package nu.marginalia.converting.sideload.reddit;
import nu.marginalia.atags.model.DomainLinks;
import nu.marginalia.converting.model.GeneratorType;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.converting.model.ProcessedDocumentDetails;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.sideload.SideloadSource;
import nu.marginalia.integration.reddit.db.RedditDb;
import nu.marginalia.keyword.DocumentKeywordExtractor;
import nu.marginalia.language.sentence.ThreadLocalSentenceExtractorProvider;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawl.DomainIndexingState;
import nu.marginalia.model.crawl.HtmlFeature;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.model.crawl.UrlIndexingState;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.idx.DocumentFlags;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.model.idx.WordFlags;
import nu.marginalia.util.ProcessingIterator;
import org.apache.commons.lang3.StringUtils;
import org.jsoup.Jsoup;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.time.Instant;
import java.time.LocalDate;
import java.time.ZoneOffset;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
public class RedditSideloader implements SideloadSource {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RedditSideloader.class);
private final List<Path> dbFiles;
private final ThreadLocalSentenceExtractorProvider sentenceExtractorProvider;
private final DocumentKeywordExtractor keywordExtractor;
public RedditSideloader(List<Path> listToDbFiles,
ThreadLocalSentenceExtractorProvider sentenceExtractorProvider,
DocumentKeywordExtractor keywordExtractor) {
this.dbFiles = listToDbFiles;
this.sentenceExtractorProvider = sentenceExtractorProvider;
this.keywordExtractor = keywordExtractor;
}
@Override
public ProcessedDomain getDomain() {
var ret = new ProcessedDomain();
ret.domain = new EdgeDomain("old.reddit.com");
ret.ip = "127.0.0.1";
ret.state = DomainIndexingState.ACTIVE;
ret.sizeloadSizeAdvice = 5_000_000;
return ret;
}
@Override
public Iterator<ProcessedDocument> getDocumentsStream() {
return ProcessingIterator.factory(24, 16).create((taskConsumer) -> {
DomainLinks domainLinks = new DomainLinks();
for (var dbFile : dbFiles) {
try (var submissions = RedditDb.getSubmissions(dbFile)) {
while (submissions.hasNext()) {
var entry = submissions.next();
taskConsumer.accept(() ->
convertDocument(entry.selftext,
entry.subreddit,
entry.title,
entry.author,
entry.permalink,
entry.created_utc,
entry.score,
domainLinks)
);
}
} catch (Exception e) {
logger.error("Error reading db file", e);
}
try (var comments = RedditDb.getComments(dbFile)) {
while (comments.hasNext()) {
var entry = comments.next();
taskConsumer.accept(() ->
convertDocument(entry.body,
entry.subreddit,
entry.title,
entry.author,
entry.permalink,
entry.created_utc,
entry.score,
domainLinks)
);
}
} catch (Exception e) {
logger.error("Error reading db file", e);
}
}
});
}
private ProcessedDocument convertDocument(String body,
String subreddit,
String title,
String author,
String permalink,
int createdUtc,
int score,
DomainLinks domainLinks) throws URISyntaxException {
String fullUrl = "https://old.reddit.com" + permalink;
StringBuilder fullHtml = new StringBuilder();
fullHtml.append("<!DOCTYPE html><html><head><title>").append(title).append("</title></head><body>");
fullHtml.append("<h1>").append(title).append("</h1>");
fullHtml.append("<p>").append(body).append("</p>");
fullHtml.append("</body></html>");
var ret = new ProcessedDocument();
try {
var url = new EdgeUrl(fullUrl);
var doc = Jsoup.parse(fullHtml.toString());
var dld = sentenceExtractorProvider.get().extractSentences(doc);
ret.url = url;
ret.words = keywordExtractor.extractKeywords(dld, url);
ret.words.addAllSyntheticTerms(List.of(
"js:true",
"site:reddit.com",
"site:old.reddit.com",
"site:www.reddit.com",
"special:ads",
"special:tracking",
"generator:forum",
subreddit
));
ret.words.add(subreddit, WordFlags.Subjects.asBit());
ret.words.add("reddit",
WordFlags.ExternalLink.asBit()
| WordFlags.Subjects.asBit()
| WordFlags.Synthetic.asBit()
| WordFlags.NamesWords.asBit());
ret.words.add(subreddit.toLowerCase(),
WordFlags.ExternalLink.asBit()
| WordFlags.NamesWords.asBit()
| WordFlags.Synthetic.asBit()
);
if (!"[deleted]".equals(author))
ret.words.add(author, WordFlags.NamesWords.asBit() | WordFlags.Synthetic.asBit());
var date = LocalDate.ofInstant(
Instant.ofEpochSecond(createdUtc),
ZoneOffset.UTC);
int year = date.getYear();
ret.details = new ProcessedDocumentDetails();
ret.details.pubYear = year;
ret.details.quality = -5;
ret.details.metadata = new DocumentMetadata(3,
PubDate.toYearByte(year),
(int) -ret.details.quality,
EnumSet.of(DocumentFlags.GeneratorForum));
ret.details.features = EnumSet.of(HtmlFeature.JS, HtmlFeature.TRACKING);
ret.details.metadata.withSizeAndTopology(10000, score);
ret.details.generator = GeneratorType.DOCS;
ret.details.title = StringUtils.truncate(STR."[/r/\{subreddit}] \{title}", 128);
ret.details.description = StringUtils.truncate(body, 255);
ret.details.length = 128;
ret.details.standard = HtmlStandard.HTML5;
ret.details.feedLinks = List.of();
ret.details.linksExternal = List.of();
ret.details.linksInternal = List.of();
ret.state = UrlIndexingState.OK;
ret.stateReason = "SIDELOAD";
}
catch (Exception e) {
logger.warn("Failed to process document", e);
ret.url = new EdgeUrl(fullUrl);
ret.state = UrlIndexingState.DISQUALIFIED;
ret.stateReason = "SIDELOAD";
}
return ret;
};
}

View File

@ -1,57 +0,0 @@
package nu.marginalia.converting.sideload.warc;
import com.google.inject.Inject;
import nu.marginalia.converting.sideload.SideloadSource;
import nu.marginalia.converting.sideload.SideloaderProcessing;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
public class WarcSideloadFactory {
private final SideloaderProcessing processing;
@Inject
public WarcSideloadFactory(SideloaderProcessing processing) {
this.processing = processing;
}
public Collection<? extends SideloadSource> createSideloaders(Path pathToWarcFiles) throws IOException {
if (Files.isRegularFile(pathToWarcFiles)) {
return List.of(new WarcSideloader(pathToWarcFiles, processing));
}
else if (Files.isDirectory(pathToWarcFiles)) {
final List<Path> files = new ArrayList<>();
try (var stream = Files.list(pathToWarcFiles)) {
stream
.filter(Files::isRegularFile)
.filter(this::isWarcFile)
.forEach(files::add);
}
List<WarcSideloader> sources = new ArrayList<>();
for (Path file : files) {
sources.add(new WarcSideloader(file, processing));
}
return sources;
}
else {
throw new IllegalArgumentException("Path " + pathToWarcFiles + " is neither a file nor a directory");
}
}
private boolean isWarcFile(Path path) {
return path.toString().endsWith(".warc")
|| path.toString().endsWith(".warc.gz");
}
}

View File

@ -35,9 +35,9 @@ public class WarcSideloader implements SideloadSource, AutoCloseable {
private final EdgeDomain domain;
@SneakyThrows
public WarcSideloader(Path warcFile,
SideloaderProcessing sideloaderProcessing)
throws IOException
{
this.sideloaderProcessing = sideloaderProcessing;
this.reader = new WarcReader(warcFile);

View File

@ -0,0 +1,55 @@
package nu.marginalia.converting.sideload.reddit;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import nu.marginalia.ProcessConfiguration;
import nu.marginalia.converting.ConverterModule;
import nu.marginalia.converting.processor.ConverterDomainTypes;
import nu.marginalia.converting.sideload.SideloadSourceFactory;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.UUID;
import static org.mockito.Mockito.when;
class RedditSideloaderTest extends AbstractModule {
/* This test case exists for debugging, to get deep into the Reddit sideloader and see if it can read the files.
* Update the path to the Reddit database in the dbPath variable.
* */
private static final Path dbPath = Path.of("/home/vlofgren/Code/RemoteEnv/local/index-1/uploads/reddit/");
private SideloadSourceFactory sourceFactory;
@BeforeEach
public void setUp() throws IOException {
sourceFactory = Guice.createInjector(new ConverterModule(), this)
.getInstance(SideloadSourceFactory.class);
}
public void configure() {
var domainTypesMock = Mockito.mock(ConverterDomainTypes.class);
when(domainTypesMock.isBlog(Mockito.any())).thenReturn(false);
bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("test", 1, UUID.randomUUID()));
bind(ConverterDomainTypes.class).toInstance(domainTypesMock);
}
@Test
void getDocumentsStream() throws IOException {
if (Files.notExists(dbPath)) {
return;
}
var sideloader = sourceFactory.sideloadReddit(dbPath);
sideloader.getDomain();
var stream = sideloader.getDocumentsStream();
for (int i = 0; i < 10; i++) {
var next = stream.next();
System.out.println(next);
}
}
}

View File

@ -58,6 +58,9 @@ public class ControlNodeActionsService {
Spark.post("/public/nodes/:node/actions/sideload-dirtree", this::sideloadDirtree,
redirectControl.renderRedirectAcknowledgement("Sideloading", "..")
);
Spark.post("/public/nodes/:node/actions/sideload-reddit", this::sideloadReddit,
redirectControl.renderRedirectAcknowledgement("Sideloading", "..")
);
Spark.post("/public/nodes/:node/actions/sideload-warc", this::sideloadWarc,
redirectControl.renderRedirectAcknowledgement("Sideloading", "..")
);
@ -141,7 +144,18 @@ public class ControlNodeActionsService {
return "";
}
public Object sideloadReddit(Request request, Response response) {
final int nodeId = Integer.parseInt(request.params("node"));
Path sourcePath = parseSourcePath(request.queryParams("source"));
eventLog.logEvent("USER-ACTION", "SIDELOAD REDDIT " + nodeId);
executorClient.sideloadReddit(nodeId, sourcePath);
return "";
}
public Object sideloadWarc(Request request, Response response) {
final int nodeId = Integer.parseInt(request.params("node"));

View File

@ -12,10 +12,10 @@ for more information on how to set this up.
<tr>
<td><input {{#if directory}}disabled{{/if}} class="form-check-input" type="radio" name="source" id="{{name}}" value="{{name}}"></td>
<td {{#if directory}}class="text-muted"{{/if}}>
<label class="form-check-label" for="inlineRadio1">{{name}}{{#if directory}}/{{/if}}</label>
<label class="form-check-label" for="{{name}}">{{name}}{{#if directory}}/{{/if}}</label>
</td>
<td>{{#unless directory}}{{size}}{{/unless}}</td>
<td>{{lastModifiedTime}}</td>
<td title={{lastModifiedTime}}>{{shortTimestamp lastModifiedTime}}</td>
</tr>
{{/each}}
{{#unless uploadDirContents.items}}

View File

@ -22,10 +22,10 @@
<tr>
<td><input {{#if directory}}disabled{{/if}} class="form-check-input" type="radio" name="source" id="{{name}}" value="{{name}}"></td>
<td {{#if directory}}class="text-muted"{{/if}}>
<label class="form-check-label" for="inlineRadio1">{{name}}{{#if directory}}/{{/if}}</label>
<label class="form-check-label" for="{{name}}">{{name}}{{#if directory}}/{{/if}}</label>
</td>
<td>{{#unless directory}}{{size}}{{/unless}}</td>
<td>{{lastModifiedTime}}</td>
<td title={{lastModifiedTime}}>{{shortTimestamp lastModifiedTime}}</td>
</tr>
{{/each}}
{{#unless uploadDirContents.items}}

View File

@ -0,0 +1,36 @@
<h1 class="my-3">Sideload Reddit</h1>
<div class="my-3 p-3 border bg-light">
This will index a pushshift.io "top-n subreddits" dump from the Reddit API into index.
</div>
<form method="post" action="actions/sideload-reddit" onsubmit="return confirm('Confirm sideloading')">
<div class="my-3 py-3">
<table class="table">
<th></th><th>Filename</th><th>Size</th><th>Last Modified</th>
{{#each uploadDirContents.items}}
<tr>
<td><input {{#unless directory}}disabled{{/unless}} class="form-check-input" type="radio" name="source" id="{{name}}" value="{{name}}"></td>
<td {{#unless directory}}class="text-muted"{{/unless}}>
<label class="form-check-label" for="{{name}}">{{name}}{{#if directory}}/{{/if}}</label>
</td>
<td>{{#unless directory}}{{size}}{{/unless}}</td>
<td title={{lastModifiedTime}}>{{shortTimestamp lastModifiedTime}}</td>
</tr>
{{/each}}
{{#unless uploadDirContents.items}}
<tr>
<td colspan="3">Nothing found in upload directory</td>
</tr>
{{/unless}}
</table>
<p>
<small class="text-muted">
The upload directory is typically mounted to /uploads on the server. The external
directory is typically something like index-{{node.id}}/uploads.
</small>
</p>
<button type="submit" {{#unless uploadDirContents.items}}disabled{{/unless}} class="btn btn-primary">Sideload Reddit</button>
</div>
</form>

View File

@ -13,12 +13,12 @@ information how to do this.
<th></th><th>Filename</th><th>Size</th><th>Last Modified</th>
{{#each uploadDirContents.items}}
<tr>
<td><input {{#unless directory}}disabled{{/unless}} class="form-check-input" type="radio" name="source" id="{{name}}" value="{{name}}"></td>
<td {{#unless directory}}class="text-muted"{{/unless}}>
<label class="form-check-label" for="inlineRadio1">{{name}}{{#if directory}}/{{/if}}</label>
<td><input class="form-check-input" type="radio" name="source" id="{{name}}" value="{{name}}"></td>
<td>
<label class="form-check-label" for="{{name}}">{{name}}{{#if directory}}/{{/if}}</label>
</td>
<td>{{#unless directory}}{{size}}{{/unless}}</td>
<td>{{lastModifiedTime}}</td>
<td title={{lastModifiedTime}}>{{shortTimestamp lastModifiedTime}}</td>
</tr>
{{/each}}
{{#unless uploadDirContents.items}}

View File

@ -12,12 +12,12 @@ A warc export can be created using e.g. wget: <p>
<th></th><th>Filename</th><th>Size</th><th>Last Modified</th>
{{#each uploadDirContents.items}}
<tr>
<td><input {{#unless directory}}disabled{{/unless}} class="form-check-input" type="radio" name="source" id="{{name}}" value="{{name}}"></td>
<td {{#unless directory}}class="text-muted"{{/unless}}>
<label class="form-check-label" for="inlineRadio1">{{name}}{{#if directory}}/{{/if}}</label>
<td><input class="form-check-input" type="radio" name="source" id="{{name}}" value="{{name}}"></td>
<td>
<label class="form-check-label" for="{{name}}">{{name}}{{#if directory}}/{{/if}}</label>
</td>
<td>{{#unless directory}}{{size}}{{/unless}}</td>
<td>{{lastModifiedTime}}</td>
<td title={{lastModifiedTime}}>{{shortTimestamp lastModifiedTime}}</td>
</tr>
{{/each}}
{{#unless uploadDirContents.items}}

View File

@ -19,6 +19,7 @@
{{#if view.sideload-stackexchange}} {{> control/node/actions/partial-sideload-stackexchange }} {{/if}}
{{#if view.sideload-warc}} {{> control/node/actions/partial-sideload-warc }} {{/if}}
{{#if view.sideload-dirtree}} {{> control/node/actions/partial-sideload-dirtree }} {{/if}}
{{#if view.sideload-reddit}} {{> control/node/actions/partial-sideload-reddit }} {{/if}}
{{#if view.export-db-data}} {{> control/node/actions/partial-export-db-data }} {{/if}}
{{#if view.export-from-crawl-data}} {{> control/node/actions/partial-export-from-crawl-data }} {{/if}}
{{#if view.export-sample-data}} {{> control/node/actions/partial-export-sample-data }} {{/if}}

View File

@ -24,6 +24,7 @@
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-stackexchange">Sideload Stackexchange</a></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-warc">Sideload WARC Files</a></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-dirtree">Sideload Dirtree</a></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=sideload-reddit">Sideload Reddit</a></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=download-sample-data">Download Sample Crawl Data</a></li>
<li><hr class="dropdown-divider"></li>
<li><a class="dropdown-item" href="/nodes/{{node.id}}/actions?view=export-db-data">Export Database Data</a></li>

View File

@ -41,6 +41,7 @@ dependencies {
implementation project(':code:features-crawl:link-parser')
implementation project(':code:features-convert:data-extractors')
implementation project(':code:features-convert:stackexchange-xml')
implementation project(':code:features-convert:reddit-json')
implementation project(':code:features-index:index-journal')
implementation project(':code:api:index-api')
implementation project(':code:api:query-api')

View File

@ -10,6 +10,7 @@ import nu.marginalia.actor.state.Resume;
import nu.marginalia.encyclopedia.EncyclopediaConverter;
import nu.marginalia.process.ProcessOutboxes;
import nu.marginalia.process.ProcessService;
import nu.marginalia.sideload.RedditSideloadHelper;
import nu.marginalia.sideload.SideloadHelper;
import nu.marginalia.sideload.StackExchangeSideloadHelper;
import nu.marginalia.storage.FileStorageService;
@ -38,6 +39,7 @@ public class ConvertActor extends RecordActorPrototype {
public record PredigestEncyclopedia(String source, String dest, String baseUrl) implements ActorStep {};
public record ConvertDirtree(String source) implements ActorStep {};
public record ConvertWarc(String source) implements ActorStep {};
public record ConvertReddit(String source) implements ActorStep {};
public record ConvertStackexchange(String source) implements ActorStep {};
@Resume(behavior = ActorResumeBehavior.RETRY)
public record ConvertWait(FileStorageId destFid,
@ -96,6 +98,28 @@ public class ConvertActor extends RecordActorPrototype {
mqConverterOutbox.sendAsync(ConvertRequest.forWarc(sourcePath, processedArea.id()))
);
}
case ConvertReddit(String source) -> {
Path sourcePath = Path.of(source);
if (!Files.exists(sourcePath))
yield new Error("Source path does not exist: " + sourcePath);
String fileName = sourcePath.toFile().getName();
var processedArea = storageService.allocateStorage(
FileStorageType.PROCESSED_DATA, "processed-data",
"Processed Reddit Data; " + fileName);
storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW);
// Convert reddit data to sqlite database
// (we can't use a Predigest- step here because the conversion is too complicated)
RedditSideloadHelper.convertRedditData(sourcePath);
yield new ConvertWait(
processedArea.id(),
mqConverterOutbox.sendAsync(ConvertRequest.forReddit(sourcePath, processedArea.id()))
);
}
case ConvertEncyclopedia(String source, String baseUrl) -> {
Path sourcePath = Path.of(source);

View File

@ -162,6 +162,18 @@ public class ExecutorGrpcService extends ExecutorApiGrpc.ExecutorApiImplBase {
}
}
@Override
public void sideloadReddit(RpcSideloadReddit request, StreamObserver<Empty> responseObserver) {
try {
sideloadService.sideloadReddit(request);
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
}
catch (Exception e) {
responseObserver.onError(e);
}
}
@Override
public void sideloadWarc(RpcSideloadWarc request, StreamObserver<Empty> responseObserver) {
try {

View File

@ -16,6 +16,7 @@ import java.nio.file.Path;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Comparator;
public class SideloadService {
private final ExecutorActorControlService actorControlService;
@ -32,6 +33,12 @@ public class SideloadService {
);
}
public void sideloadReddit(RpcSideloadReddit request) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.ConvertReddit(request.getSourcePath())
);
}
public void sideloadWarc(RpcSideloadWarc request) throws Exception {
actorControlService.startFrom(ExecutorActor.CONVERT,
new ConvertActor.ConvertWarc(request.getSourcePath())
@ -55,7 +62,10 @@ public class SideloadService {
public RpcUploadDirContents listUploadDir() throws IOException {
Path uploadDir = WmsaHome.getUploadDir();
try (var items = Files.list(uploadDir)) {
try (var items = Files.list(uploadDir).sorted(
Comparator.comparing((Path d) -> Files.isDirectory(d)).reversed()
.thenComparing(path -> path.getFileName().toString())
)) {
var builder = RpcUploadDirContents.newBuilder().setPath(uploadDir.toString());
var iter = items.iterator();

View File

@ -0,0 +1,104 @@
package nu.marginalia.sideload;
import nu.marginalia.integration.reddit.db.RedditDb;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.stream.Collectors;
/** Contains helper functions for pre-converting stackexchange style 7z
* files to marginalia-digestible sqlite databases*/
public class RedditSideloadHelper {
private static final Logger logger = LoggerFactory.getLogger(RedditSideloadHelper.class);
/** Looks for stackexchange 7z files in the given path and converts them to sqlite databases.
* The function is idempotent, so it is safe to call it multiple times on the same path
* (it will not re-convert files that have already been successfully converted)
* */
public static void convertRedditData(Path sourcePath) {
if (!Files.isDirectory(sourcePath)) {
throw new UnsupportedOperationException("RedditSideloadHelper.convertRedditData only supports directories");
}
Set<String> allFileNames;
try (var contents = Files.list(sourcePath)) {
allFileNames = contents.filter(Files::isRegularFile)
.map(Path::toFile)
.map(File::getName)
.filter(name -> name.endsWith(".zst"))
.sorted()
.collect(Collectors.toCollection(TreeSet::new));
} catch (IOException ex) {
logger.warn("Failed to convert reddit zstd file to sqlite database", ex);
return;
}
int parallelism = Math.clamp(ForkJoinPool.getCommonPoolParallelism(), 1, Runtime.getRuntime().availableProcessors() / 2);
try (var executor = Executors.newWorkStealingPool(parallelism))
{
for (var fileName : allFileNames) {
if (!fileName.endsWith(RedditFilePair.submissionsSuffix)) {
continue;
}
String baseName = fileName.substring(0, fileName.length() - RedditFilePair.submissionsSuffix.length());
String commentsFileName = baseName + RedditFilePair.commentsSuffix;
if (!allFileNames.contains(commentsFileName)) {
logger.warn("Skipping reddit file pair {} because it is missing the comments file", fileName);
return;
}
executor.submit(() -> convertSingleRedditFile(new RedditFilePair(sourcePath, baseName)));
}
}
}
record RedditFilePair(Path rootDir, String fileNameBase) {
static String submissionsSuffix = "_submissions.zst";
static String commentsSuffix = "_comments.zst";
public String submissionsFileName() { return fileNameBase + submissionsSuffix; }
public String commentsFileName() { return fileNameBase + commentsSuffix; }
public Path submissionsPath() { return rootDir.resolve(submissionsFileName()); }
public Path commentsPath() { return rootDir.resolve(commentsFileName()); }
}
private static void convertSingleRedditFile(RedditFilePair files) {
try {
Path destPath = getRedditDbPath(files);
if (Files.exists(destPath)) // already converted
return;
Path tempFile = Files.createTempFile(destPath.getParent(), "processed", "db.tmp");
try {
logger.info("Converting reddit zstd file {} to sqlite database", files.fileNameBase);
RedditDb.create(files.submissionsPath(), files.commentsPath(), tempFile);
logger.info("Finished converting reddit zstd file {} to sqlite database", files.fileNameBase);
Files.move(tempFile, destPath, StandardCopyOption.REPLACE_EXISTING);
} catch (Exception e) {
logger.error("Failed to convert reddit zstd file to sqlite database", e);
Files.deleteIfExists(tempFile);
Files.deleteIfExists(destPath);
}
} catch (IOException ex) {
logger.warn("Failed to convert reddit zstd file to sqlite database", ex);
}
}
private static Path getRedditDbPath(RedditFilePair pair) throws IOException {
String hash = SideloadHelper.getCrc32FileHash(pair.commentsPath());
return pair.rootDir().resolve(STR."\{pair.fileNameBase}.\{hash}.db");
}
}

View File

@ -37,6 +37,7 @@ include 'code:features-convert:adblock'
include 'code:features-convert:anchor-keywords'
include 'code:features-convert:data-extractors'
include 'code:features-convert:stackexchange-xml'
include 'code:features-convert:reddit-json'
include 'code:features-convert:pubdate'
include 'code:features-convert:summary-extraction'
include 'code:features-convert:keyword-extraction'