diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java index c9b286d5..7f301261 100644 --- a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java +++ b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java @@ -142,7 +142,13 @@ public class ExecutorClient extends AbstractDynamicClient { .build() ); } - + public void sideloadReddit(int node, Path sourcePath) { + stubPool.apiForNode(node).sideloadReddit( + RpcSideloadReddit.newBuilder() + .setSourcePath(sourcePath.toString()) + .build() + ); + } public void sideloadWarc(int node, Path sourcePath) { stubPool.apiForNode(node).sideloadWarc( RpcSideloadWarc.newBuilder() diff --git a/code/api/executor-api/src/main/protobuf/executor-api.proto b/code/api/executor-api/src/main/protobuf/executor-api.proto index bc05844e..2c2756f2 100644 --- a/code/api/executor-api/src/main/protobuf/executor-api.proto +++ b/code/api/executor-api/src/main/protobuf/executor-api.proto @@ -20,6 +20,7 @@ service ExecutorApi { rpc sideloadEncyclopedia(RpcSideloadEncyclopedia) returns (Empty) {} rpc sideloadDirtree(RpcSideloadDirtree) returns (Empty) {} rpc sideloadWarc(RpcSideloadWarc) returns (Empty) {} + rpc sideloadReddit(RpcSideloadReddit) returns (Empty) {} rpc sideloadStackexchange(RpcSideloadStackexchange) returns (Empty) {} rpc createCrawlSpecFromDownload(RpcCrawlSpecFromDownload) returns (Empty) {} @@ -58,6 +59,9 @@ message RpcSideloadDirtree { message RpcSideloadWarc { string sourcePath = 1; } +message RpcSideloadReddit { + string sourcePath = 1; +} message RpcSideloadStackexchange { string sourcePath = 1; } diff --git a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java index 17102c06..5ab8e4ce 100644 --- a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java @@ -5,5 +5,6 @@ public enum ConvertAction { SideloadEncyclopedia, SideloadDirtree, SideloadWarc, + SideloadReddit, SideloadStackexchange } diff --git a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java index cf445e5a..51d678e3 100644 --- a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java @@ -1,6 +1,7 @@ package nu.marginalia.mqapi.converting; import lombok.AllArgsConstructor; +import lombok.NonNull; import nu.marginalia.storage.model.FileStorageId; import java.nio.file.Path; @@ -13,6 +14,13 @@ public class ConvertRequest { public final FileStorageId processedDataStorage; public final String baseUrl; + public Path getInputPath() { + if (inputSource == null) + return null; + + return Path.of(inputSource); + } + public static ConvertRequest forCrawlData(FileStorageId sourceId, FileStorageId destId) { return new ConvertRequest( ConvertAction.ConvertCrawlData, @@ -45,7 +53,13 @@ public class ConvertRequest { destId, null); } - + public static ConvertRequest forReddit(Path sourcePath, FileStorageId destId) { + return new ConvertRequest(ConvertAction.SideloadReddit, + sourcePath.toString(), + null, + destId, + null); + } public static ConvertRequest forStackexchange(Path sourcePath, FileStorageId destId) { return new ConvertRequest(ConvertAction.SideloadStackexchange, sourcePath.toString(), diff --git a/code/features-convert/reddit-json/build.gradle b/code/features-convert/reddit-json/build.gradle new file mode 100644 index 00000000..08420127 --- /dev/null +++ b/code/features-convert/reddit-json/build.gradle @@ -0,0 +1,39 @@ +plugins { + id 'java' + + id 'jvm-test-suite' +} + +java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(21)) + } +} + +dependencies { + implementation libs.bundles.slf4j + + implementation project(':code:libraries:blocking-thread-pool') + implementation project(':code:common:model') + implementation libs.notnull + + implementation libs.jsoup + implementation libs.sqlite + + implementation libs.guice + implementation libs.guava + implementation libs.gson + implementation libs.zstd + implementation libs.trove + implementation libs.commons.compress + implementation libs.xz + + testImplementation libs.bundles.slf4j.test + testImplementation libs.bundles.junit + testImplementation libs.mockito +} + +test { + maxHeapSize = "8G" + useJUnitPlatform() +} diff --git a/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/RedditEntryReader.java b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/RedditEntryReader.java new file mode 100644 index 00000000..af4ceee9 --- /dev/null +++ b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/RedditEntryReader.java @@ -0,0 +1,60 @@ +package nu.marginalia.integration.reddit; + +import com.github.luben.zstd.ZstdInputStream; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.stream.JsonReader; +import nu.marginalia.integration.reddit.model.RawRedditComment; +import nu.marginalia.integration.reddit.model.RawRedditSubmission; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Iterator; + +public class RedditEntryReader { + public static EntryIterator readSubmissions(Path file) throws IOException { + return new EntryIterator<>(file, RawRedditSubmission.class); + } + + public static EntryIterator readComments(Path file) throws IOException { + return new EntryIterator<>(file, RawRedditComment.class); + } + + public static class EntryIterator implements Iterator, AutoCloseable { + private final JsonReader reader; + private final Class type; + private final Gson gson = new GsonBuilder().create(); + + private EntryIterator(Path file, Class type) throws IOException { + this.type = type; + + reader = new JsonReader(new InputStreamReader(new ZstdInputStream(Files.newInputStream(file, StandardOpenOption.READ)))); + + // Set the reader to be lenient to allow multiple top level objects + reader.setLenient(true); + } + + @Override + public boolean hasNext() { + try { + return reader.hasNext(); + } catch (IOException e) { + e.printStackTrace(); + return false; + } + } + + @Override + public T next() { + return gson.fromJson(reader, type); + } + + @Override + public void close() throws IOException { + reader.close(); + } + } +} diff --git a/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/db/RedditDb.java b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/db/RedditDb.java new file mode 100644 index 00000000..a4612631 --- /dev/null +++ b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/db/RedditDb.java @@ -0,0 +1,203 @@ +package nu.marginalia.integration.reddit.db; + +import com.google.common.base.Strings; +import lombok.SneakyThrows; +import nu.marginalia.integration.reddit.RedditEntryReader; +import nu.marginalia.integration.reddit.model.ProcessableRedditComment; +import nu.marginalia.integration.reddit.model.ProcessableRedditSubmission; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.*; +import java.util.Iterator; + +/** This class is responsible for creating and accessing a sqlite database with reddit data, for + * easier aggregation and subsequent processing. */ +public class RedditDb { + public static void create(Path submissionsFile, Path commentsFile, Path dbFile) throws IOException, SQLException + { + Files.deleteIfExists(dbFile); + + try (var connection = DriverManager.getConnection(STR."jdbc:sqlite:\{dbFile}"); + var stream = ClassLoader.getSystemResourceAsStream("db/reddit.sql"); + var updateStmt = connection.createStatement() + ) { + var sql = new String(stream.readAllBytes()); + + String[] sqlParts = sql.split(";"); + for (var part : sqlParts) { + if (part.isBlank()) { + continue; + } + updateStmt.executeUpdate(part); + } + updateStmt.execute("PRAGMA synchronous = OFF"); + + + try (var iter = RedditEntryReader.readSubmissions(submissionsFile); + var stmt = connection.prepareStatement(""" + INSERT OR IGNORE INTO submission(id, author, created_utc, score, title, selftext, subreddit, permalink) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """) + ) { + while (iter.hasNext()) { + var submission = iter.next(); + + if (Strings.isNullOrEmpty(submission.name)) + continue; + + stmt.setString(1, submission.name); + stmt.setString(2, submission.author); + stmt.setInt(3, submission.created_utc); + stmt.setInt(4, submission.score); + stmt.setString(5, submission.title); + if (submission.score > 10 && submission.selftext.length() > 1000) { + stmt.setString(6, submission.selftext); + } else { + stmt.setString(6, ""); + } + stmt.setString(7, submission.subreddit); + stmt.setString(8, submission.permalink); + stmt.executeUpdate(); + } + + } + try (var iter = RedditEntryReader.readComments(commentsFile); + var stmt = connection.prepareStatement(""" + INSERT OR IGNORE INTO comment(id, author, score, body, threadId) + VALUES (?, ?, ?, ?, ?) + """) + ) + { + while (iter.hasNext()) { + var comment = iter.next(); + + if (comment.body.length() < 1000) continue; + if (comment.score < 10) continue; + + // We only want to store top-level comments + if (!comment.parent_id.startsWith("t3")) continue; + + stmt.setString(1, comment.id); + stmt.setString(2, comment.author); + stmt.setInt(3, comment.score); + stmt.setString(4, comment.body); + stmt.setString(5, comment.parent_id); + stmt.executeUpdate(); + } + } + } + } + + public static SubmissionIterator getSubmissions(Path file) throws SQLException { + var connection = DriverManager.getConnection(STR."jdbc:sqlite:\{file}"); + + return new SubmissionIterator(connection); + } + public static CommentIterator getComments(Path file) throws SQLException { + var connection = DriverManager.getConnection(STR."jdbc:sqlite:\{file}"); + + return new CommentIterator(connection); + } + + public static class SubmissionIterator extends SqlQueryIterator { + + SubmissionIterator(Connection connection) throws SQLException { + super(connection, """ + SELECT id, author, created_utc, score, title, selftext, subreddit, permalink + FROM submission + WHERE length(selftext) > 0 + """); + } + + @Override + ProcessableRedditSubmission nextFromResultSet(ResultSet resultSet) throws SQLException { + return new ProcessableRedditSubmission(resultSet.getString("subreddit"), + resultSet.getString("id"), + resultSet.getString("author"), + resultSet.getString("title"), + resultSet.getString("selftext"), + resultSet.getInt("created_utc"), + resultSet.getString("permalink"), + resultSet.getInt("score") + ); + } + } + + public static class CommentIterator extends SqlQueryIterator { + + CommentIterator(Connection connection) throws SQLException { + super(connection, """ + select submission.subreddit, + comment.id, + comment.author, + submission.title, + body, + created_utc, + permalink, + comment.score + from comment + inner join submission on threadId=submission.id + """); + } + + @Override + ProcessableRedditComment nextFromResultSet(ResultSet resultSet) throws SQLException { + return new ProcessableRedditComment( + resultSet.getString("subreddit"), + resultSet.getString("id"), + resultSet.getString("author"), + resultSet.getString("title"), + resultSet.getString("body"), + resultSet.getInt("created_utc"), + resultSet.getString("permalink") + resultSet.getString("id"), + resultSet.getInt("score") + ); + } + } + + + static abstract class SqlQueryIterator implements Iterator, AutoCloseable { + private final PreparedStatement stmt; + private final ResultSet resultSet; + + private Boolean hasNext = null; + SqlQueryIterator(Connection connection, String query) throws SQLException { + // This is sql-injection safe since the query is not user input: + stmt = connection.prepareStatement(query); + + resultSet = stmt.executeQuery(); + } + @Override + public void close() throws Exception { + resultSet.close(); + stmt.close(); + } + + @SneakyThrows + @Override + public boolean hasNext() { + if (hasNext != null) + return hasNext; + + hasNext = resultSet.next(); + + return hasNext; + } + + abstract T nextFromResultSet(ResultSet resultSet) throws SQLException; + + @SneakyThrows + @Override + public T next() { + if (!hasNext()) + throw new IllegalStateException(); + else hasNext = null; + + return nextFromResultSet(resultSet); + + } + } + +} diff --git a/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/ProcessableRedditComment.java b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/ProcessableRedditComment.java new file mode 100644 index 00000000..6610427a --- /dev/null +++ b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/ProcessableRedditComment.java @@ -0,0 +1,19 @@ +package nu.marginalia.integration.reddit.model; + +import lombok.AllArgsConstructor; +import lombok.ToString; + +/** A projection of a Reddit comment joined with its top level submission + * that is ready for processing. */ +@AllArgsConstructor +@ToString +public class ProcessableRedditComment { + public String subreddit; + public String name; + public String author; + public String title; + public String body; + public int created_utc; + public String permalink; + public int score; +} diff --git a/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/ProcessableRedditSubmission.java b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/ProcessableRedditSubmission.java new file mode 100644 index 00000000..bf41f0eb --- /dev/null +++ b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/ProcessableRedditSubmission.java @@ -0,0 +1,17 @@ +package nu.marginalia.integration.reddit.model; + +import lombok.AllArgsConstructor; +import lombok.ToString; + +/** A projection of a Reddit top level submission that is appropriate for processing. */ +@AllArgsConstructor @ToString +public class ProcessableRedditSubmission { + public String subreddit; + public String name; + public String author; + public String title; + public String selftext; + public int created_utc; + public String permalink; + public int score; +} diff --git a/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/RawRedditComment.java b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/RawRedditComment.java new file mode 100644 index 00000000..d35397cd --- /dev/null +++ b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/RawRedditComment.java @@ -0,0 +1,20 @@ +package nu.marginalia.integration.reddit.model; + + +import lombok.AllArgsConstructor; +import lombok.ToString; +import lombok.With; + +/** Corresponds directly to the pushshift.io Reddit comment JSON format. */ +@AllArgsConstructor +@ToString +@With +public class RawRedditComment { + public String parent_id; + public String link_id; + public String id; + public String author; + public String body; + public String subreddit; + public int score; +} diff --git a/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/RawRedditSubmission.java b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/RawRedditSubmission.java new file mode 100644 index 00000000..0dea9cf8 --- /dev/null +++ b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/RawRedditSubmission.java @@ -0,0 +1,22 @@ +package nu.marginalia.integration.reddit.model; + + +import lombok.AllArgsConstructor; +import lombok.ToString; +import lombok.With; + +/** Corresponds directly to the pushshift.io Reddit submission JSON format. */ +@AllArgsConstructor +@With +@ToString +public class RawRedditSubmission { + public int score; + public String subreddit; + public String name; + public String author; + public String title; + public String selftext; + public int num_comments; + public int created_utc; + public String permalink; +} diff --git a/code/features-convert/reddit-json/src/main/resources/db/reddit.sql b/code/features-convert/reddit-json/src/main/resources/db/reddit.sql new file mode 100644 index 00000000..60481f5e --- /dev/null +++ b/code/features-convert/reddit-json/src/main/resources/db/reddit.sql @@ -0,0 +1,25 @@ +CREATE TABLE submission ( + id TEXT PRIMARY KEY, + author TEXT, + subreddit TEXT, + title TEXT, + selftext TEXT, + score INTEGER, + created_utc INTEGER, + num_comments INTEGER, + permalink TEXT +); + +CREATE TABLE comment ( + id TEXT PRIMARY KEY, + threadId TEXT, + author TEXT, + subreddit TEXT, + title TEXT, + body TEXT, + score INTEGER, + num_comments INTEGER +); + +CREATE INDEX submission_id ON submission(id); +CREATE INDEX comment_id ON comment(id); \ No newline at end of file diff --git a/code/features-convert/reddit-json/src/test/java/nu/marginalia/integration/reddit/RedditEntryReaderTest.java b/code/features-convert/reddit-json/src/test/java/nu/marginalia/integration/reddit/RedditEntryReaderTest.java new file mode 100644 index 00000000..7201eb33 --- /dev/null +++ b/code/features-convert/reddit-json/src/test/java/nu/marginalia/integration/reddit/RedditEntryReaderTest.java @@ -0,0 +1,54 @@ +package nu.marginalia.integration.reddit; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; + +class RedditEntryReaderTest { + /** This test case exists for debugging, reddit sideloading. It requires local reddit data, + * and is not part of the normal test suite. Update the path to a directory with reddit data + * in the dbPath variable. + * */ + Path dbPath = Path.of("/home/vlofgren/Code/RemoteEnv/local/index-1/uploads/reddit/"); + + @Test + void readSubmissions() throws IOException { + if (!Files.exists(dbPath)) + return; + + try (var iter = RedditEntryReader.readSubmissions(dbPath.resolve("weightroom_submissions.zst"))) { + for (int i = 0; iter.hasNext() && i<50; ) { + var entry = iter.next(); + if (entry.selftext.length() > 1000 && entry.score > 10) { + System.out.println(entry); + i++; + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + void readComments() throws IOException { + if (!Files.exists(dbPath)) + return; + + try (var iter = RedditEntryReader.readComments(dbPath.resolve("weightroom_comments.zst"))) { + for (int i = 0; iter.hasNext() && i<50; ) { + var entry = iter.next(); + if (entry.body.length() > 1000 && entry.score > 10 && entry.parent_id.startsWith("t1")) { + System.out.println(entry); + i++; + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/code/features-convert/reddit-json/src/test/java/nu/marginalia/integration/reddit/db/RedditDbTest.java b/code/features-convert/reddit-json/src/test/java/nu/marginalia/integration/reddit/db/RedditDbTest.java new file mode 100644 index 00000000..bff2aab6 --- /dev/null +++ b/code/features-convert/reddit-json/src/test/java/nu/marginalia/integration/reddit/db/RedditDbTest.java @@ -0,0 +1,56 @@ +package nu.marginalia.integration.reddit.db; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.sql.SQLException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; + +import static org.junit.jupiter.api.Assertions.*; + +class RedditDbTest { + + @Disabled + @Test + void create() throws SQLException, IOException { + RedditDb.create( + Path.of("/home/vlofgren/Exports/reddit/weightroom_submissions.zst"), + Path.of("/home/vlofgren/Exports/reddit/weightroom_comments.zst"), + Path.of("/tmp/reddit.db") + ); + } + + @Disabled + @Test + void readSubmissions() { + try (var iter = RedditDb.getSubmissions(Path.of("/home/vlofgren/Code/RemoteEnv/local/index-1/uploads/reddit/weightroom.8eda94e.db"))) { + for (int i = 0; i < 10; i++) { + System.out.println(iter.next()); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Disabled + @Test + void readComments() { + try (var iter = RedditDb.getComments(Path.of("/home/vlofgren/Code/RemoteEnv/local/index-1/uploads/reddit/weightroom.8eda94e.db"))) { + for (int i = 0; i < 10; i++) { + var entry = iter.next(); + System.out.println(iter.next()); + System.out.println(LocalDate.ofInstant(Instant.ofEpochSecond(entry.created_utc), ZoneOffset.UTC).getYear()); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/DocumentRecordKeywordsProjection.java b/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/DocumentRecordKeywordsProjection.java index 16cdf2a8..411fd13c 100644 --- a/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/DocumentRecordKeywordsProjection.java +++ b/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/DocumentRecordKeywordsProjection.java @@ -1,23 +1,15 @@ package nu.marginalia.model.processed; -import blue.strategic.parquet.Dehydrator; import blue.strategic.parquet.Hydrator; -import blue.strategic.parquet.ValueWriter; import gnu.trove.list.TLongList; import gnu.trove.list.array.TLongArrayList; import lombok.*; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.Types; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; - @Getter @Setter @NoArgsConstructor diff --git a/code/processes/converting-process/build.gradle b/code/processes/converting-process/build.gradle index 011994ea..5b1dbde5 100644 --- a/code/processes/converting-process/build.gradle +++ b/code/processes/converting-process/build.gradle @@ -53,6 +53,7 @@ dependencies { implementation project(':code:features-convert:keyword-extraction') implementation project(':code:features-convert:summary-extraction') implementation project(':code:features-convert:stackexchange-xml') + implementation project(':code:features-convert:reddit-json') implementation project(':code:features-crawl:crawl-blocklist') implementation project(':code:features-crawl:link-parser') diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index 5e87f688..c72e284a 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -28,6 +28,7 @@ import nu.marginalia.converting.processor.DomainProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.nio.file.Path; import java.sql.SQLException; import java.util.Collection; @@ -247,6 +248,9 @@ public class ConverterMain extends ProcessMainClass { try { var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class); + // will be null on ConvertCrawlData + final Path inputPath = request.getInputPath(); + return switch (request.action) { case ConvertCrawlData -> { var crawlData = fileStorageService.getStorage(request.crawlStorage); @@ -261,7 +265,8 @@ public class ConverterMain extends ProcessMainClass { case SideloadEncyclopedia -> { var processData = fileStorageService.getStorage(request.processedDataStorage); - yield new SideloadAction(sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(Path.of(request.inputSource), request.baseUrl), + yield new SideloadAction( + sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(inputPath, request.baseUrl), processData.asPath(), msg, inbox); } @@ -269,7 +274,7 @@ public class ConverterMain extends ProcessMainClass { var processData = fileStorageService.getStorage(request.processedDataStorage); yield new SideloadAction( - sideloadSourceFactory.sideloadDirtree(Path.of(request.inputSource)), + sideloadSourceFactory.sideloadDirtree(inputPath), processData.asPath(), msg, inbox); } @@ -277,14 +282,23 @@ public class ConverterMain extends ProcessMainClass { var processData = fileStorageService.getStorage(request.processedDataStorage); yield new SideloadAction( - sideloadSourceFactory.sideloadWarc(Path.of(request.inputSource)), + sideloadSourceFactory.sideloadWarc(inputPath), + processData.asPath(), + msg, inbox); + } + case SideloadReddit -> { + var processData = fileStorageService.getStorage(request.processedDataStorage); + + yield new SideloadAction( + sideloadSourceFactory.sideloadReddit(inputPath), processData.asPath(), msg, inbox); } case SideloadStackexchange -> { var processData = fileStorageService.getStorage(request.processedDataStorage); - yield new SideloadAction(sideloadSourceFactory.sideloadStackexchange(Path.of(request.inputSource)), + yield new SideloadAction( + sideloadSourceFactory.sideloadStackexchange(inputPath), processData.asPath(), msg, inbox); } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java index 058c0eba..18cf0104 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java @@ -6,8 +6,9 @@ import nu.marginalia.atags.AnchorTextKeywords; import nu.marginalia.atags.source.AnchorTagsSourceFactory; import nu.marginalia.converting.sideload.dirtree.DirtreeSideloaderFactory; import nu.marginalia.converting.sideload.encyclopedia.EncyclopediaMarginaliaNuSideloader; +import nu.marginalia.converting.sideload.reddit.RedditSideloader; import nu.marginalia.converting.sideload.stackexchange.StackexchangeSideloader; -import nu.marginalia.converting.sideload.warc.WarcSideloadFactory; +import nu.marginalia.converting.sideload.warc.WarcSideloader; import nu.marginalia.keyword.DocumentKeywordExtractor; import nu.marginalia.language.sentence.ThreadLocalSentenceExtractorProvider; @@ -15,8 +16,10 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.function.Predicate; public class SideloadSourceFactory { private final Gson gson; @@ -26,16 +29,15 @@ public class SideloadSourceFactory { private final AnchorTextKeywords anchorTextKeywords; private final AnchorTagsSourceFactory anchorTagsSourceFactory; private final DirtreeSideloaderFactory dirtreeSideloaderFactory; - private final WarcSideloadFactory warcSideloadFactory; @Inject public SideloadSourceFactory(Gson gson, SideloaderProcessing sideloaderProcessing, ThreadLocalSentenceExtractorProvider sentenceExtractorProvider, - DocumentKeywordExtractor documentKeywordExtractor, AnchorTextKeywords anchorTextKeywords, + DocumentKeywordExtractor documentKeywordExtractor, + AnchorTextKeywords anchorTextKeywords, AnchorTagsSourceFactory anchorTagsSourceFactory, - DirtreeSideloaderFactory dirtreeSideloaderFactory, - WarcSideloadFactory warcSideloadFactory) { + DirtreeSideloaderFactory dirtreeSideloaderFactory) { this.gson = gson; this.sideloaderProcessing = sideloaderProcessing; this.sentenceExtractorProvider = sentenceExtractorProvider; @@ -43,7 +45,6 @@ public class SideloadSourceFactory { this.anchorTextKeywords = anchorTextKeywords; this.anchorTagsSourceFactory = anchorTagsSourceFactory; this.dirtreeSideloaderFactory = dirtreeSideloaderFactory; - this.warcSideloadFactory = warcSideloadFactory; } public SideloadSource sideloadEncyclopediaMarginaliaNu(Path pathToDbFile, String baseUrl) throws SQLException { @@ -55,24 +56,98 @@ public class SideloadSourceFactory { } public Collection sideloadWarc(Path pathToWarcFiles) throws IOException { - return warcSideloadFactory.createSideloaders(pathToWarcFiles); + return sideload(pathToWarcFiles, + new PathSuffixPredicate(".warc", ".warc.gz"), + (Path file) -> new WarcSideloader(file, sideloaderProcessing) + ); + } + + public SideloadSource sideloadReddit(Path pathToDbFiles) throws IOException { + return sideload(pathToDbFiles, + new PathSuffixPredicate(".db"), + (List paths) -> new RedditSideloader(paths, sentenceExtractorProvider, documentKeywordExtractor)); } public Collection sideloadStackexchange(Path pathToDbFileRoot) throws IOException { - if (Files.isRegularFile(pathToDbFileRoot)) { - return List.of(new StackexchangeSideloader(pathToDbFileRoot, sentenceExtractorProvider, documentKeywordExtractor)); + return sideload(pathToDbFileRoot, + new PathSuffixPredicate(".db"), + (Path dbFile) -> new StackexchangeSideloader(dbFile, sentenceExtractorProvider, documentKeywordExtractor) + ); + } + + interface SideloadConstructorMany { + SideloadSource construct(List paths) throws IOException; + } + + interface SideloadConstructorSingle { + SideloadSource construct(Path paths) throws IOException; + } + + Collection sideload( + Path path, + Predicate pathPredicate, + SideloadConstructorSingle constructor + ) throws IOException { + if (Files.isRegularFile(path)) { + return List.of(constructor.construct(path)); } - else if (Files.isDirectory(pathToDbFileRoot)) { - try (var dirs = Files.walk(pathToDbFileRoot)) { - return dirs + else if (Files.isDirectory(path)) { + try (var dirs = Files.walk(path)) { + List sideloadSources = new ArrayList<>(); + dirs .filter(Files::isRegularFile) - .filter(f -> f.toFile().getName().endsWith(".db")) - .map(dbFile -> new StackexchangeSideloader(dbFile, sentenceExtractorProvider, documentKeywordExtractor)) - .toList(); + .filter(pathPredicate) + .forEach(file -> { + try { + sideloadSources.add(constructor.construct(file)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + return sideloadSources; } } else { // unix socket, etc throw new IllegalArgumentException("Path to stackexchange db file(s) must be a file or directory"); } } + + SideloadSource sideload( + Path path, + Predicate pathPredicate, + SideloadConstructorMany constructor + ) throws IOException { + if (Files.isRegularFile(path)) { + return constructor.construct(List.of(path)); + } + else if (Files.isDirectory(path)) { + try (var dirs = Files.walk(path)) { + var paths = dirs + .filter(Files::isRegularFile) + .filter(pathPredicate) + .toList(); + + return constructor.construct(paths); + } + } + else { // unix socket, etc + throw new IllegalArgumentException("Path to stackexchange db file(s) must be a file or directory"); + } + } + + + private static class PathSuffixPredicate implements Predicate { + private final List endings; + + public PathSuffixPredicate(String... endings) { + this.endings = List.of(endings); + } + + @Override + public boolean test(Path path) { + String fileName = path.toFile().getName(); + + return endings.stream().anyMatch(fileName::endsWith); + } + } } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/reddit/RedditSideloader.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/reddit/RedditSideloader.java new file mode 100644 index 00000000..97e519d6 --- /dev/null +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/reddit/RedditSideloader.java @@ -0,0 +1,195 @@ +package nu.marginalia.converting.sideload.reddit; + +import nu.marginalia.atags.model.DomainLinks; +import nu.marginalia.converting.model.GeneratorType; +import nu.marginalia.converting.model.ProcessedDocument; +import nu.marginalia.converting.model.ProcessedDocumentDetails; +import nu.marginalia.converting.model.ProcessedDomain; +import nu.marginalia.converting.sideload.SideloadSource; +import nu.marginalia.integration.reddit.db.RedditDb; +import nu.marginalia.keyword.DocumentKeywordExtractor; +import nu.marginalia.language.sentence.ThreadLocalSentenceExtractorProvider; +import nu.marginalia.model.EdgeDomain; +import nu.marginalia.model.EdgeUrl; +import nu.marginalia.model.crawl.DomainIndexingState; +import nu.marginalia.model.crawl.HtmlFeature; +import nu.marginalia.model.crawl.PubDate; +import nu.marginalia.model.crawl.UrlIndexingState; +import nu.marginalia.model.html.HtmlStandard; +import nu.marginalia.model.idx.DocumentFlags; +import nu.marginalia.model.idx.DocumentMetadata; +import nu.marginalia.model.idx.WordFlags; +import nu.marginalia.util.ProcessingIterator; +import org.apache.commons.lang3.StringUtils; +import org.jsoup.Jsoup; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; + +public class RedditSideloader implements SideloadSource { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RedditSideloader.class); + + private final List dbFiles; + private final ThreadLocalSentenceExtractorProvider sentenceExtractorProvider; + private final DocumentKeywordExtractor keywordExtractor; + + public RedditSideloader(List listToDbFiles, + ThreadLocalSentenceExtractorProvider sentenceExtractorProvider, + DocumentKeywordExtractor keywordExtractor) { + this.dbFiles = listToDbFiles; + this.sentenceExtractorProvider = sentenceExtractorProvider; + this.keywordExtractor = keywordExtractor; + } + + @Override + public ProcessedDomain getDomain() { + var ret = new ProcessedDomain(); + + ret.domain = new EdgeDomain("old.reddit.com"); + ret.ip = "127.0.0.1"; + ret.state = DomainIndexingState.ACTIVE; + + ret.sizeloadSizeAdvice = 5_000_000; + + return ret; + } + + @Override + public Iterator getDocumentsStream() { + return ProcessingIterator.factory(24, 16).create((taskConsumer) -> { + DomainLinks domainLinks = new DomainLinks(); + + for (var dbFile : dbFiles) { + try (var submissions = RedditDb.getSubmissions(dbFile)) { + while (submissions.hasNext()) { + var entry = submissions.next(); + taskConsumer.accept(() -> + convertDocument(entry.selftext, + entry.subreddit, + entry.title, + entry.author, + entry.permalink, + entry.created_utc, + entry.score, + domainLinks) + ); + } + } catch (Exception e) { + logger.error("Error reading db file", e); + } + + try (var comments = RedditDb.getComments(dbFile)) { + while (comments.hasNext()) { + var entry = comments.next(); + taskConsumer.accept(() -> + convertDocument(entry.body, + entry.subreddit, + entry.title, + entry.author, + entry.permalink, + entry.created_utc, + entry.score, + domainLinks) + ); + } + } catch (Exception e) { + logger.error("Error reading db file", e); + } + } + }); + } + + private ProcessedDocument convertDocument(String body, + String subreddit, + String title, + String author, + String permalink, + int createdUtc, + int score, + DomainLinks domainLinks) throws URISyntaxException { + String fullUrl = "https://old.reddit.com" + permalink; + + StringBuilder fullHtml = new StringBuilder(); + fullHtml.append("").append(title).append(""); + fullHtml.append("

").append(title).append("

"); + fullHtml.append("

").append(body).append("

"); + fullHtml.append(""); + + var ret = new ProcessedDocument(); + try { + + var url = new EdgeUrl(fullUrl); + var doc = Jsoup.parse(fullHtml.toString()); + var dld = sentenceExtractorProvider.get().extractSentences(doc); + + ret.url = url; + ret.words = keywordExtractor.extractKeywords(dld, url); + + ret.words.addAllSyntheticTerms(List.of( + "js:true", + "site:reddit.com", + "site:old.reddit.com", + "site:www.reddit.com", + "special:ads", + "special:tracking", + "generator:forum", + subreddit + )); + + ret.words.add(subreddit, WordFlags.Subjects.asBit()); + ret.words.add("reddit", + WordFlags.ExternalLink.asBit() + | WordFlags.Subjects.asBit() + | WordFlags.Synthetic.asBit() + | WordFlags.NamesWords.asBit()); + ret.words.add(subreddit.toLowerCase(), + WordFlags.ExternalLink.asBit() + | WordFlags.NamesWords.asBit() + | WordFlags.Synthetic.asBit() + ); + if (!"[deleted]".equals(author)) + ret.words.add(author, WordFlags.NamesWords.asBit() | WordFlags.Synthetic.asBit()); + + var date = LocalDate.ofInstant( + Instant.ofEpochSecond(createdUtc), + ZoneOffset.UTC); + int year = date.getYear(); + + ret.details = new ProcessedDocumentDetails(); + ret.details.pubYear = year; + ret.details.quality = -5; + ret.details.metadata = new DocumentMetadata(3, + PubDate.toYearByte(year), + (int) -ret.details.quality, + EnumSet.of(DocumentFlags.GeneratorForum)); + ret.details.features = EnumSet.of(HtmlFeature.JS, HtmlFeature.TRACKING); + + ret.details.metadata.withSizeAndTopology(10000, score); + + ret.details.generator = GeneratorType.DOCS; + ret.details.title = StringUtils.truncate(STR."[/r/\{subreddit}] \{title}", 128); + ret.details.description = StringUtils.truncate(body, 255); + ret.details.length = 128; + + ret.details.standard = HtmlStandard.HTML5; + ret.details.feedLinks = List.of(); + ret.details.linksExternal = List.of(); + ret.details.linksInternal = List.of(); + ret.state = UrlIndexingState.OK; + ret.stateReason = "SIDELOAD"; + } + catch (Exception e) { + logger.warn("Failed to process document", e); + ret.url = new EdgeUrl(fullUrl); + ret.state = UrlIndexingState.DISQUALIFIED; + ret.stateReason = "SIDELOAD"; + } + return ret; + }; +} diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloadFactory.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloadFactory.java deleted file mode 100644 index 5ba88432..00000000 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloadFactory.java +++ /dev/null @@ -1,57 +0,0 @@ -package nu.marginalia.converting.sideload.warc; - -import com.google.inject.Inject; -import nu.marginalia.converting.sideload.SideloadSource; -import nu.marginalia.converting.sideload.SideloaderProcessing; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -public class WarcSideloadFactory { - - private final SideloaderProcessing processing; - - @Inject - public WarcSideloadFactory(SideloaderProcessing processing) { - this.processing = processing; - } - - public Collection createSideloaders(Path pathToWarcFiles) throws IOException { - - if (Files.isRegularFile(pathToWarcFiles)) { - return List.of(new WarcSideloader(pathToWarcFiles, processing)); - } - else if (Files.isDirectory(pathToWarcFiles)) { - - final List files = new ArrayList<>(); - - try (var stream = Files.list(pathToWarcFiles)) { - stream - .filter(Files::isRegularFile) - .filter(this::isWarcFile) - .forEach(files::add); - - } - - List sources = new ArrayList<>(); - - for (Path file : files) { - sources.add(new WarcSideloader(file, processing)); - } - - return sources; - } - else { - throw new IllegalArgumentException("Path " + pathToWarcFiles + " is neither a file nor a directory"); - } - } - - private boolean isWarcFile(Path path) { - return path.toString().endsWith(".warc") - || path.toString().endsWith(".warc.gz"); - } -} \ No newline at end of file diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloader.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloader.java index 97406ff0..edb670fa 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloader.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloader.java @@ -35,9 +35,9 @@ public class WarcSideloader implements SideloadSource, AutoCloseable { private final EdgeDomain domain; + @SneakyThrows public WarcSideloader(Path warcFile, SideloaderProcessing sideloaderProcessing) - throws IOException { this.sideloaderProcessing = sideloaderProcessing; this.reader = new WarcReader(warcFile); diff --git a/code/processes/converting-process/src/test/java/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java b/code/processes/converting-process/src/test/java/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java new file mode 100644 index 00000000..5d7e25b4 --- /dev/null +++ b/code/processes/converting-process/src/test/java/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java @@ -0,0 +1,55 @@ +package nu.marginalia.converting.sideload.reddit; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import nu.marginalia.ProcessConfiguration; +import nu.marginalia.converting.ConverterModule; +import nu.marginalia.converting.processor.ConverterDomainTypes; +import nu.marginalia.converting.sideload.SideloadSourceFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; + +import static org.mockito.Mockito.when; + +class RedditSideloaderTest extends AbstractModule { + /* This test case exists for debugging, to get deep into the Reddit sideloader and see if it can read the files. + * Update the path to the Reddit database in the dbPath variable. + * */ + private static final Path dbPath = Path.of("/home/vlofgren/Code/RemoteEnv/local/index-1/uploads/reddit/"); + + private SideloadSourceFactory sourceFactory; + @BeforeEach + public void setUp() throws IOException { + sourceFactory = Guice.createInjector(new ConverterModule(), this) + .getInstance(SideloadSourceFactory.class); + } + + public void configure() { + var domainTypesMock = Mockito.mock(ConverterDomainTypes.class); + when(domainTypesMock.isBlog(Mockito.any())).thenReturn(false); + bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("test", 1, UUID.randomUUID())); + bind(ConverterDomainTypes.class).toInstance(domainTypesMock); + } + + @Test + void getDocumentsStream() throws IOException { + if (Files.notExists(dbPath)) { + return; + } + + var sideloader = sourceFactory.sideloadReddit(dbPath); + + sideloader.getDomain(); + var stream = sideloader.getDocumentsStream(); + for (int i = 0; i < 10; i++) { + var next = stream.next(); + System.out.println(next); + } + } +} \ No newline at end of file diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java index cf8acaf4..05de9b07 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java @@ -58,6 +58,9 @@ public class ControlNodeActionsService { Spark.post("/public/nodes/:node/actions/sideload-dirtree", this::sideloadDirtree, redirectControl.renderRedirectAcknowledgement("Sideloading", "..") ); + Spark.post("/public/nodes/:node/actions/sideload-reddit", this::sideloadReddit, + redirectControl.renderRedirectAcknowledgement("Sideloading", "..") + ); Spark.post("/public/nodes/:node/actions/sideload-warc", this::sideloadWarc, redirectControl.renderRedirectAcknowledgement("Sideloading", "..") ); @@ -141,7 +144,18 @@ public class ControlNodeActionsService { return ""; } + public Object sideloadReddit(Request request, Response response) { + final int nodeId = Integer.parseInt(request.params("node")); + + Path sourcePath = parseSourcePath(request.queryParams("source")); + + eventLog.logEvent("USER-ACTION", "SIDELOAD REDDIT " + nodeId); + + executorClient.sideloadReddit(nodeId, sourcePath); + + return ""; + } public Object sideloadWarc(Request request, Response response) { final int nodeId = Integer.parseInt(request.params("node")); diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-dirtree.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-dirtree.hdb index ac65e966..b620bfba 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-dirtree.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-dirtree.hdb @@ -12,10 +12,10 @@ for more information on how to set this up. - + {{#unless directory}}{{size}}{{/unless}} - {{lastModifiedTime}} + {{shortTimestamp lastModifiedTime}} {{/each}} {{#unless uploadDirContents.items}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-encyclopedia.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-encyclopedia.hdb index 62bbce9a..39d5c686 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-encyclopedia.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-encyclopedia.hdb @@ -22,10 +22,10 @@ - + {{#unless directory}}{{size}}{{/unless}} - {{lastModifiedTime}} + {{shortTimestamp lastModifiedTime}} {{/each}} {{#unless uploadDirContents.items}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-reddit.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-reddit.hdb new file mode 100644 index 00000000..bf9f36b3 --- /dev/null +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-reddit.hdb @@ -0,0 +1,36 @@ +

Sideload Reddit

+ +
+This will index a pushshift.io "top-n subreddits" dump from the Reddit API into index. +
+
+
+ + + {{#each uploadDirContents.items}} + + + + + + + {{/each}} + {{#unless uploadDirContents.items}} + + + + {{/unless}} +
FilenameSizeLast Modified
+ + {{#unless directory}}{{size}}{{/unless}}{{shortTimestamp lastModifiedTime}}
Nothing found in upload directory
+ +

+ + The upload directory is typically mounted to /uploads on the server. The external + directory is typically something like index-{{node.id}}/uploads. + +

+ + +
+
\ No newline at end of file diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-stackexchange.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-stackexchange.hdb index 701fbb51..f5f73f84 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-stackexchange.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-stackexchange.hdb @@ -13,12 +13,12 @@ information how to do this. FilenameSizeLast Modified {{#each uploadDirContents.items}} - - - + + + {{#unless directory}}{{size}}{{/unless}} - {{lastModifiedTime}} + {{shortTimestamp lastModifiedTime}} {{/each}} {{#unless uploadDirContents.items}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-warc.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-warc.hdb index 587c02c0..7680b7b8 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-warc.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-warc.hdb @@ -12,12 +12,12 @@ A warc export can be created using e.g. wget:

FilenameSizeLast Modified {{#each uploadDirContents.items}} - - - + + + {{#unless directory}}{{size}}{{/unless}} - {{lastModifiedTime}} + {{shortTimestamp lastModifiedTime}} {{/each}} {{#unless uploadDirContents.items}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/node-actions.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/node-actions.hdb index ea4502fa..df8ed77f 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/node-actions.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/node-actions.hdb @@ -19,6 +19,7 @@ {{#if view.sideload-stackexchange}} {{> control/node/actions/partial-sideload-stackexchange }} {{/if}} {{#if view.sideload-warc}} {{> control/node/actions/partial-sideload-warc }} {{/if}} {{#if view.sideload-dirtree}} {{> control/node/actions/partial-sideload-dirtree }} {{/if}} + {{#if view.sideload-reddit}} {{> control/node/actions/partial-sideload-reddit }} {{/if}} {{#if view.export-db-data}} {{> control/node/actions/partial-export-db-data }} {{/if}} {{#if view.export-from-crawl-data}} {{> control/node/actions/partial-export-from-crawl-data }} {{/if}} {{#if view.export-sample-data}} {{> control/node/actions/partial-export-sample-data }} {{/if}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/partial-node-nav.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/partial-node-nav.hdb index de963ea3..23627155 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/partial-node-nav.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/partial-node-nav.hdb @@ -24,6 +24,7 @@

  • Sideload Stackexchange
  • Sideload WARC Files
  • Sideload Dirtree
  • +
  • Sideload Reddit
  • Download Sample Crawl Data
  • Export Database Data
  • diff --git a/code/services-core/executor-service/build.gradle b/code/services-core/executor-service/build.gradle index e903d048..f2d9678d 100644 --- a/code/services-core/executor-service/build.gradle +++ b/code/services-core/executor-service/build.gradle @@ -41,6 +41,7 @@ dependencies { implementation project(':code:features-crawl:link-parser') implementation project(':code:features-convert:data-extractors') implementation project(':code:features-convert:stackexchange-xml') + implementation project(':code:features-convert:reddit-json') implementation project(':code:features-index:index-journal') implementation project(':code:api:index-api') implementation project(':code:api:query-api') diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertActor.java index aed6d05a..ee9fdba0 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertActor.java @@ -10,6 +10,7 @@ import nu.marginalia.actor.state.Resume; import nu.marginalia.encyclopedia.EncyclopediaConverter; import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessService; +import nu.marginalia.sideload.RedditSideloadHelper; import nu.marginalia.sideload.SideloadHelper; import nu.marginalia.sideload.StackExchangeSideloadHelper; import nu.marginalia.storage.FileStorageService; @@ -38,6 +39,7 @@ public class ConvertActor extends RecordActorPrototype { public record PredigestEncyclopedia(String source, String dest, String baseUrl) implements ActorStep {}; public record ConvertDirtree(String source) implements ActorStep {}; public record ConvertWarc(String source) implements ActorStep {}; + public record ConvertReddit(String source) implements ActorStep {}; public record ConvertStackexchange(String source) implements ActorStep {}; @Resume(behavior = ActorResumeBehavior.RETRY) public record ConvertWait(FileStorageId destFid, @@ -96,6 +98,28 @@ public class ConvertActor extends RecordActorPrototype { mqConverterOutbox.sendAsync(ConvertRequest.forWarc(sourcePath, processedArea.id())) ); } + case ConvertReddit(String source) -> { + Path sourcePath = Path.of(source); + if (!Files.exists(sourcePath)) + yield new Error("Source path does not exist: " + sourcePath); + + String fileName = sourcePath.toFile().getName(); + + var processedArea = storageService.allocateStorage( + FileStorageType.PROCESSED_DATA, "processed-data", + "Processed Reddit Data; " + fileName); + + storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW); + + // Convert reddit data to sqlite database + // (we can't use a Predigest- step here because the conversion is too complicated) + RedditSideloadHelper.convertRedditData(sourcePath); + + yield new ConvertWait( + processedArea.id(), + mqConverterOutbox.sendAsync(ConvertRequest.forReddit(sourcePath, processedArea.id())) + ); + } case ConvertEncyclopedia(String source, String baseUrl) -> { Path sourcePath = Path.of(source); diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java index 0816c51b..e0969196 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java @@ -162,6 +162,18 @@ public class ExecutorGrpcService extends ExecutorApiGrpc.ExecutorApiImplBase { } } + @Override + public void sideloadReddit(RpcSideloadReddit request, StreamObserver responseObserver) { + try { + sideloadService.sideloadReddit(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + @Override public void sideloadWarc(RpcSideloadWarc request, StreamObserver responseObserver) { try { diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java index 7daefe6e..136b6f1d 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java @@ -16,6 +16,7 @@ import java.nio.file.Path; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.Comparator; public class SideloadService { private final ExecutorActorControlService actorControlService; @@ -32,6 +33,12 @@ public class SideloadService { ); } + public void sideloadReddit(RpcSideloadReddit request) throws Exception { + actorControlService.startFrom(ExecutorActor.CONVERT, + new ConvertActor.ConvertReddit(request.getSourcePath()) + ); + } + public void sideloadWarc(RpcSideloadWarc request) throws Exception { actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertWarc(request.getSourcePath()) @@ -55,7 +62,10 @@ public class SideloadService { public RpcUploadDirContents listUploadDir() throws IOException { Path uploadDir = WmsaHome.getUploadDir(); - try (var items = Files.list(uploadDir)) { + try (var items = Files.list(uploadDir).sorted( + Comparator.comparing((Path d) -> Files.isDirectory(d)).reversed() + .thenComparing(path -> path.getFileName().toString()) + )) { var builder = RpcUploadDirContents.newBuilder().setPath(uploadDir.toString()); var iter = items.iterator(); diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/sideload/RedditSideloadHelper.java b/code/services-core/executor-service/src/main/java/nu/marginalia/sideload/RedditSideloadHelper.java new file mode 100644 index 00000000..23d7e544 --- /dev/null +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/sideload/RedditSideloadHelper.java @@ -0,0 +1,104 @@ +package nu.marginalia.sideload; + +import nu.marginalia.integration.reddit.db.RedditDb; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Collectors; + +/** Contains helper functions for pre-converting stackexchange style 7z + * files to marginalia-digestible sqlite databases*/ +public class RedditSideloadHelper { + private static final Logger logger = LoggerFactory.getLogger(RedditSideloadHelper.class); + + /** Looks for stackexchange 7z files in the given path and converts them to sqlite databases. + * The function is idempotent, so it is safe to call it multiple times on the same path + * (it will not re-convert files that have already been successfully converted) + * */ + public static void convertRedditData(Path sourcePath) { + if (!Files.isDirectory(sourcePath)) { + throw new UnsupportedOperationException("RedditSideloadHelper.convertRedditData only supports directories"); + } + + Set allFileNames; + try (var contents = Files.list(sourcePath)) { + allFileNames = contents.filter(Files::isRegularFile) + .map(Path::toFile) + .map(File::getName) + .filter(name -> name.endsWith(".zst")) + .sorted() + .collect(Collectors.toCollection(TreeSet::new)); + } catch (IOException ex) { + logger.warn("Failed to convert reddit zstd file to sqlite database", ex); + return; + } + + int parallelism = Math.clamp(ForkJoinPool.getCommonPoolParallelism(), 1, Runtime.getRuntime().availableProcessors() / 2); + try (var executor = Executors.newWorkStealingPool(parallelism)) + { + for (var fileName : allFileNames) { + if (!fileName.endsWith(RedditFilePair.submissionsSuffix)) { + continue; + } + + String baseName = fileName.substring(0, fileName.length() - RedditFilePair.submissionsSuffix.length()); + String commentsFileName = baseName + RedditFilePair.commentsSuffix; + + if (!allFileNames.contains(commentsFileName)) { + logger.warn("Skipping reddit file pair {} because it is missing the comments file", fileName); + return; + } + + executor.submit(() -> convertSingleRedditFile(new RedditFilePair(sourcePath, baseName))); + } + } + } + + record RedditFilePair(Path rootDir, String fileNameBase) { + static String submissionsSuffix = "_submissions.zst"; + static String commentsSuffix = "_comments.zst"; + + public String submissionsFileName() { return fileNameBase + submissionsSuffix; } + public String commentsFileName() { return fileNameBase + commentsSuffix; } + + public Path submissionsPath() { return rootDir.resolve(submissionsFileName()); } + public Path commentsPath() { return rootDir.resolve(commentsFileName()); } + } + + private static void convertSingleRedditFile(RedditFilePair files) { + try { + Path destPath = getRedditDbPath(files); + if (Files.exists(destPath)) // already converted + return; + + Path tempFile = Files.createTempFile(destPath.getParent(), "processed", "db.tmp"); + try { + logger.info("Converting reddit zstd file {} to sqlite database", files.fileNameBase); + RedditDb.create(files.submissionsPath(), files.commentsPath(), tempFile); + logger.info("Finished converting reddit zstd file {} to sqlite database", files.fileNameBase); + Files.move(tempFile, destPath, StandardCopyOption.REPLACE_EXISTING); + } catch (Exception e) { + logger.error("Failed to convert reddit zstd file to sqlite database", e); + Files.deleteIfExists(tempFile); + Files.deleteIfExists(destPath); + } + } catch (IOException ex) { + logger.warn("Failed to convert reddit zstd file to sqlite database", ex); + } + } + + private static Path getRedditDbPath(RedditFilePair pair) throws IOException { + String hash = SideloadHelper.getCrc32FileHash(pair.commentsPath()); + return pair.rootDir().resolve(STR."\{pair.fileNameBase}.\{hash}.db"); + } + +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index dcec22b4..af0cad4f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -37,6 +37,7 @@ include 'code:features-convert:adblock' include 'code:features-convert:anchor-keywords' include 'code:features-convert:data-extractors' include 'code:features-convert:stackexchange-xml' +include 'code:features-convert:reddit-json' include 'code:features-convert:pubdate' include 'code:features-convert:summary-extraction' include 'code:features-convert:keyword-extraction'