From fab36d6e636c60d5f222ae3880eb3d1011b8edfb Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Wed, 14 Feb 2024 11:11:23 +0100 Subject: [PATCH 1/3] (converter) Loader for reddit data Adds experimental sideloading support for pusshift.io style reddit data. This dataset is limited to data older than 2023, due to licensing changes making large-scale data extraction difficult. Since the median post quality on reddit is not very good, he sideloader will only load a subset of self-texts and top-level comments that have sufficiently many upvotes. Empirically this appears to mostly return good matches, even if it probably could index more. Tests were written for this, but all require local reddit data which can't be distributed with the source code. If these can not be found, the tests will shortcircuit as OK. They're mostly there for debugging, and it's fine if they don't always run. The change also refactors the sideloading a bit since it was a bit messy. --- .../executor/client/ExecutorClient.java | 8 +- .../src/main/protobuf/executor-api.proto | 4 + .../mqapi/converting/ConvertAction.java | 1 + .../mqapi/converting/ConvertRequest.java | 16 +- .../features-convert/reddit-json/build.gradle | 39 ++++ .../integration/reddit/RedditEntryReader.java | 60 ++++++ .../integration/reddit/db/RedditDb.java | 203 ++++++++++++++++++ .../model/ProcessableRedditComment.java | 19 ++ .../model/ProcessableRedditSubmission.java | 17 ++ .../reddit/model/RawRedditComment.java | 20 ++ .../reddit/model/RawRedditSubmission.java | 22 ++ .../src/main/resources/db/reddit.sql | 25 +++ .../reddit/RedditEntryReaderTest.java | 54 +++++ .../integration/reddit/db/RedditDbTest.java | 56 +++++ .../DocumentRecordKeywordsProjection.java | 8 - .../processes/converting-process/build.gradle | 1 + .../marginalia/converting/ConverterMain.java | 22 +- .../sideload/SideloadSourceFactory.java | 105 +++++++-- .../sideload/reddit/RedditSideloader.java | 195 +++++++++++++++++ .../sideload/warc/WarcSideloadFactory.java | 57 ----- .../sideload/warc/WarcSideloader.java | 2 +- .../sideload/reddit/RedditSideloaderTest.java | 55 +++++ .../node/svc/ControlNodeActionsService.java | 14 ++ .../node/actions/partial-sideload-reddit.hdb | 36 ++++ .../templates/control/node/node-actions.hdb | 1 + .../control/node/partial-node-nav.hdb | 1 + .../executor-service/build.gradle | 1 + .../marginalia/actor/task/ConvertActor.java | 24 +++ .../executor/ExecutorGrpcService.java | 12 ++ .../executor/svc/SideloadService.java | 6 + .../sideload/RedditSideloadHelper.java | 104 +++++++++ settings.gradle | 1 + 32 files changed, 1102 insertions(+), 87 deletions(-) create mode 100644 code/features-convert/reddit-json/build.gradle create mode 100644 code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/RedditEntryReader.java create mode 100644 code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/db/RedditDb.java create mode 100644 code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/ProcessableRedditComment.java create mode 100644 code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/ProcessableRedditSubmission.java create mode 100644 code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/RawRedditComment.java create mode 100644 code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/RawRedditSubmission.java create mode 100644 code/features-convert/reddit-json/src/main/resources/db/reddit.sql create mode 100644 code/features-convert/reddit-json/src/test/java/nu/marginalia/integration/reddit/RedditEntryReaderTest.java create mode 100644 code/features-convert/reddit-json/src/test/java/nu/marginalia/integration/reddit/db/RedditDbTest.java create mode 100644 code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/reddit/RedditSideloader.java delete mode 100644 code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloadFactory.java create mode 100644 code/processes/converting-process/src/test/java/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java create mode 100644 code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-reddit.hdb create mode 100644 code/services-core/executor-service/src/main/java/nu/marginalia/sideload/RedditSideloadHelper.java diff --git a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java index c9b286d5..7f301261 100644 --- a/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java +++ b/code/api/executor-api/src/main/java/nu/marginalia/executor/client/ExecutorClient.java @@ -142,7 +142,13 @@ public class ExecutorClient extends AbstractDynamicClient { .build() ); } - + public void sideloadReddit(int node, Path sourcePath) { + stubPool.apiForNode(node).sideloadReddit( + RpcSideloadReddit.newBuilder() + .setSourcePath(sourcePath.toString()) + .build() + ); + } public void sideloadWarc(int node, Path sourcePath) { stubPool.apiForNode(node).sideloadWarc( RpcSideloadWarc.newBuilder() diff --git a/code/api/executor-api/src/main/protobuf/executor-api.proto b/code/api/executor-api/src/main/protobuf/executor-api.proto index bc05844e..2c2756f2 100644 --- a/code/api/executor-api/src/main/protobuf/executor-api.proto +++ b/code/api/executor-api/src/main/protobuf/executor-api.proto @@ -20,6 +20,7 @@ service ExecutorApi { rpc sideloadEncyclopedia(RpcSideloadEncyclopedia) returns (Empty) {} rpc sideloadDirtree(RpcSideloadDirtree) returns (Empty) {} rpc sideloadWarc(RpcSideloadWarc) returns (Empty) {} + rpc sideloadReddit(RpcSideloadReddit) returns (Empty) {} rpc sideloadStackexchange(RpcSideloadStackexchange) returns (Empty) {} rpc createCrawlSpecFromDownload(RpcCrawlSpecFromDownload) returns (Empty) {} @@ -58,6 +59,9 @@ message RpcSideloadDirtree { message RpcSideloadWarc { string sourcePath = 1; } +message RpcSideloadReddit { + string sourcePath = 1; +} message RpcSideloadStackexchange { string sourcePath = 1; } diff --git a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java index 17102c06..5ab8e4ce 100644 --- a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertAction.java @@ -5,5 +5,6 @@ public enum ConvertAction { SideloadEncyclopedia, SideloadDirtree, SideloadWarc, + SideloadReddit, SideloadStackexchange } diff --git a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java index cf445e5a..51d678e3 100644 --- a/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java +++ b/code/api/process-mqapi/src/main/java/nu/marginalia/mqapi/converting/ConvertRequest.java @@ -1,6 +1,7 @@ package nu.marginalia.mqapi.converting; import lombok.AllArgsConstructor; +import lombok.NonNull; import nu.marginalia.storage.model.FileStorageId; import java.nio.file.Path; @@ -13,6 +14,13 @@ public class ConvertRequest { public final FileStorageId processedDataStorage; public final String baseUrl; + public Path getInputPath() { + if (inputSource == null) + return null; + + return Path.of(inputSource); + } + public static ConvertRequest forCrawlData(FileStorageId sourceId, FileStorageId destId) { return new ConvertRequest( ConvertAction.ConvertCrawlData, @@ -45,7 +53,13 @@ public class ConvertRequest { destId, null); } - + public static ConvertRequest forReddit(Path sourcePath, FileStorageId destId) { + return new ConvertRequest(ConvertAction.SideloadReddit, + sourcePath.toString(), + null, + destId, + null); + } public static ConvertRequest forStackexchange(Path sourcePath, FileStorageId destId) { return new ConvertRequest(ConvertAction.SideloadStackexchange, sourcePath.toString(), diff --git a/code/features-convert/reddit-json/build.gradle b/code/features-convert/reddit-json/build.gradle new file mode 100644 index 00000000..08420127 --- /dev/null +++ b/code/features-convert/reddit-json/build.gradle @@ -0,0 +1,39 @@ +plugins { + id 'java' + + id 'jvm-test-suite' +} + +java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(21)) + } +} + +dependencies { + implementation libs.bundles.slf4j + + implementation project(':code:libraries:blocking-thread-pool') + implementation project(':code:common:model') + implementation libs.notnull + + implementation libs.jsoup + implementation libs.sqlite + + implementation libs.guice + implementation libs.guava + implementation libs.gson + implementation libs.zstd + implementation libs.trove + implementation libs.commons.compress + implementation libs.xz + + testImplementation libs.bundles.slf4j.test + testImplementation libs.bundles.junit + testImplementation libs.mockito +} + +test { + maxHeapSize = "8G" + useJUnitPlatform() +} diff --git a/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/RedditEntryReader.java b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/RedditEntryReader.java new file mode 100644 index 00000000..af4ceee9 --- /dev/null +++ b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/RedditEntryReader.java @@ -0,0 +1,60 @@ +package nu.marginalia.integration.reddit; + +import com.github.luben.zstd.ZstdInputStream; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.stream.JsonReader; +import nu.marginalia.integration.reddit.model.RawRedditComment; +import nu.marginalia.integration.reddit.model.RawRedditSubmission; + +import java.io.IOException; +import java.io.InputStreamReader; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.Iterator; + +public class RedditEntryReader { + public static EntryIterator readSubmissions(Path file) throws IOException { + return new EntryIterator<>(file, RawRedditSubmission.class); + } + + public static EntryIterator readComments(Path file) throws IOException { + return new EntryIterator<>(file, RawRedditComment.class); + } + + public static class EntryIterator implements Iterator, AutoCloseable { + private final JsonReader reader; + private final Class type; + private final Gson gson = new GsonBuilder().create(); + + private EntryIterator(Path file, Class type) throws IOException { + this.type = type; + + reader = new JsonReader(new InputStreamReader(new ZstdInputStream(Files.newInputStream(file, StandardOpenOption.READ)))); + + // Set the reader to be lenient to allow multiple top level objects + reader.setLenient(true); + } + + @Override + public boolean hasNext() { + try { + return reader.hasNext(); + } catch (IOException e) { + e.printStackTrace(); + return false; + } + } + + @Override + public T next() { + return gson.fromJson(reader, type); + } + + @Override + public void close() throws IOException { + reader.close(); + } + } +} diff --git a/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/db/RedditDb.java b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/db/RedditDb.java new file mode 100644 index 00000000..a4612631 --- /dev/null +++ b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/db/RedditDb.java @@ -0,0 +1,203 @@ +package nu.marginalia.integration.reddit.db; + +import com.google.common.base.Strings; +import lombok.SneakyThrows; +import nu.marginalia.integration.reddit.RedditEntryReader; +import nu.marginalia.integration.reddit.model.ProcessableRedditComment; +import nu.marginalia.integration.reddit.model.ProcessableRedditSubmission; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.sql.*; +import java.util.Iterator; + +/** This class is responsible for creating and accessing a sqlite database with reddit data, for + * easier aggregation and subsequent processing. */ +public class RedditDb { + public static void create(Path submissionsFile, Path commentsFile, Path dbFile) throws IOException, SQLException + { + Files.deleteIfExists(dbFile); + + try (var connection = DriverManager.getConnection(STR."jdbc:sqlite:\{dbFile}"); + var stream = ClassLoader.getSystemResourceAsStream("db/reddit.sql"); + var updateStmt = connection.createStatement() + ) { + var sql = new String(stream.readAllBytes()); + + String[] sqlParts = sql.split(";"); + for (var part : sqlParts) { + if (part.isBlank()) { + continue; + } + updateStmt.executeUpdate(part); + } + updateStmt.execute("PRAGMA synchronous = OFF"); + + + try (var iter = RedditEntryReader.readSubmissions(submissionsFile); + var stmt = connection.prepareStatement(""" + INSERT OR IGNORE INTO submission(id, author, created_utc, score, title, selftext, subreddit, permalink) + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """) + ) { + while (iter.hasNext()) { + var submission = iter.next(); + + if (Strings.isNullOrEmpty(submission.name)) + continue; + + stmt.setString(1, submission.name); + stmt.setString(2, submission.author); + stmt.setInt(3, submission.created_utc); + stmt.setInt(4, submission.score); + stmt.setString(5, submission.title); + if (submission.score > 10 && submission.selftext.length() > 1000) { + stmt.setString(6, submission.selftext); + } else { + stmt.setString(6, ""); + } + stmt.setString(7, submission.subreddit); + stmt.setString(8, submission.permalink); + stmt.executeUpdate(); + } + + } + try (var iter = RedditEntryReader.readComments(commentsFile); + var stmt = connection.prepareStatement(""" + INSERT OR IGNORE INTO comment(id, author, score, body, threadId) + VALUES (?, ?, ?, ?, ?) + """) + ) + { + while (iter.hasNext()) { + var comment = iter.next(); + + if (comment.body.length() < 1000) continue; + if (comment.score < 10) continue; + + // We only want to store top-level comments + if (!comment.parent_id.startsWith("t3")) continue; + + stmt.setString(1, comment.id); + stmt.setString(2, comment.author); + stmt.setInt(3, comment.score); + stmt.setString(4, comment.body); + stmt.setString(5, comment.parent_id); + stmt.executeUpdate(); + } + } + } + } + + public static SubmissionIterator getSubmissions(Path file) throws SQLException { + var connection = DriverManager.getConnection(STR."jdbc:sqlite:\{file}"); + + return new SubmissionIterator(connection); + } + public static CommentIterator getComments(Path file) throws SQLException { + var connection = DriverManager.getConnection(STR."jdbc:sqlite:\{file}"); + + return new CommentIterator(connection); + } + + public static class SubmissionIterator extends SqlQueryIterator { + + SubmissionIterator(Connection connection) throws SQLException { + super(connection, """ + SELECT id, author, created_utc, score, title, selftext, subreddit, permalink + FROM submission + WHERE length(selftext) > 0 + """); + } + + @Override + ProcessableRedditSubmission nextFromResultSet(ResultSet resultSet) throws SQLException { + return new ProcessableRedditSubmission(resultSet.getString("subreddit"), + resultSet.getString("id"), + resultSet.getString("author"), + resultSet.getString("title"), + resultSet.getString("selftext"), + resultSet.getInt("created_utc"), + resultSet.getString("permalink"), + resultSet.getInt("score") + ); + } + } + + public static class CommentIterator extends SqlQueryIterator { + + CommentIterator(Connection connection) throws SQLException { + super(connection, """ + select submission.subreddit, + comment.id, + comment.author, + submission.title, + body, + created_utc, + permalink, + comment.score + from comment + inner join submission on threadId=submission.id + """); + } + + @Override + ProcessableRedditComment nextFromResultSet(ResultSet resultSet) throws SQLException { + return new ProcessableRedditComment( + resultSet.getString("subreddit"), + resultSet.getString("id"), + resultSet.getString("author"), + resultSet.getString("title"), + resultSet.getString("body"), + resultSet.getInt("created_utc"), + resultSet.getString("permalink") + resultSet.getString("id"), + resultSet.getInt("score") + ); + } + } + + + static abstract class SqlQueryIterator implements Iterator, AutoCloseable { + private final PreparedStatement stmt; + private final ResultSet resultSet; + + private Boolean hasNext = null; + SqlQueryIterator(Connection connection, String query) throws SQLException { + // This is sql-injection safe since the query is not user input: + stmt = connection.prepareStatement(query); + + resultSet = stmt.executeQuery(); + } + @Override + public void close() throws Exception { + resultSet.close(); + stmt.close(); + } + + @SneakyThrows + @Override + public boolean hasNext() { + if (hasNext != null) + return hasNext; + + hasNext = resultSet.next(); + + return hasNext; + } + + abstract T nextFromResultSet(ResultSet resultSet) throws SQLException; + + @SneakyThrows + @Override + public T next() { + if (!hasNext()) + throw new IllegalStateException(); + else hasNext = null; + + return nextFromResultSet(resultSet); + + } + } + +} diff --git a/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/ProcessableRedditComment.java b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/ProcessableRedditComment.java new file mode 100644 index 00000000..6610427a --- /dev/null +++ b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/ProcessableRedditComment.java @@ -0,0 +1,19 @@ +package nu.marginalia.integration.reddit.model; + +import lombok.AllArgsConstructor; +import lombok.ToString; + +/** A projection of a Reddit comment joined with its top level submission + * that is ready for processing. */ +@AllArgsConstructor +@ToString +public class ProcessableRedditComment { + public String subreddit; + public String name; + public String author; + public String title; + public String body; + public int created_utc; + public String permalink; + public int score; +} diff --git a/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/ProcessableRedditSubmission.java b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/ProcessableRedditSubmission.java new file mode 100644 index 00000000..bf41f0eb --- /dev/null +++ b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/ProcessableRedditSubmission.java @@ -0,0 +1,17 @@ +package nu.marginalia.integration.reddit.model; + +import lombok.AllArgsConstructor; +import lombok.ToString; + +/** A projection of a Reddit top level submission that is appropriate for processing. */ +@AllArgsConstructor @ToString +public class ProcessableRedditSubmission { + public String subreddit; + public String name; + public String author; + public String title; + public String selftext; + public int created_utc; + public String permalink; + public int score; +} diff --git a/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/RawRedditComment.java b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/RawRedditComment.java new file mode 100644 index 00000000..d35397cd --- /dev/null +++ b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/RawRedditComment.java @@ -0,0 +1,20 @@ +package nu.marginalia.integration.reddit.model; + + +import lombok.AllArgsConstructor; +import lombok.ToString; +import lombok.With; + +/** Corresponds directly to the pushshift.io Reddit comment JSON format. */ +@AllArgsConstructor +@ToString +@With +public class RawRedditComment { + public String parent_id; + public String link_id; + public String id; + public String author; + public String body; + public String subreddit; + public int score; +} diff --git a/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/RawRedditSubmission.java b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/RawRedditSubmission.java new file mode 100644 index 00000000..0dea9cf8 --- /dev/null +++ b/code/features-convert/reddit-json/src/main/java/nu/marginalia/integration/reddit/model/RawRedditSubmission.java @@ -0,0 +1,22 @@ +package nu.marginalia.integration.reddit.model; + + +import lombok.AllArgsConstructor; +import lombok.ToString; +import lombok.With; + +/** Corresponds directly to the pushshift.io Reddit submission JSON format. */ +@AllArgsConstructor +@With +@ToString +public class RawRedditSubmission { + public int score; + public String subreddit; + public String name; + public String author; + public String title; + public String selftext; + public int num_comments; + public int created_utc; + public String permalink; +} diff --git a/code/features-convert/reddit-json/src/main/resources/db/reddit.sql b/code/features-convert/reddit-json/src/main/resources/db/reddit.sql new file mode 100644 index 00000000..60481f5e --- /dev/null +++ b/code/features-convert/reddit-json/src/main/resources/db/reddit.sql @@ -0,0 +1,25 @@ +CREATE TABLE submission ( + id TEXT PRIMARY KEY, + author TEXT, + subreddit TEXT, + title TEXT, + selftext TEXT, + score INTEGER, + created_utc INTEGER, + num_comments INTEGER, + permalink TEXT +); + +CREATE TABLE comment ( + id TEXT PRIMARY KEY, + threadId TEXT, + author TEXT, + subreddit TEXT, + title TEXT, + body TEXT, + score INTEGER, + num_comments INTEGER +); + +CREATE INDEX submission_id ON submission(id); +CREATE INDEX comment_id ON comment(id); \ No newline at end of file diff --git a/code/features-convert/reddit-json/src/test/java/nu/marginalia/integration/reddit/RedditEntryReaderTest.java b/code/features-convert/reddit-json/src/test/java/nu/marginalia/integration/reddit/RedditEntryReaderTest.java new file mode 100644 index 00000000..7201eb33 --- /dev/null +++ b/code/features-convert/reddit-json/src/test/java/nu/marginalia/integration/reddit/RedditEntryReaderTest.java @@ -0,0 +1,54 @@ +package nu.marginalia.integration.reddit; + +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; + +class RedditEntryReaderTest { + /** This test case exists for debugging, reddit sideloading. It requires local reddit data, + * and is not part of the normal test suite. Update the path to a directory with reddit data + * in the dbPath variable. + * */ + Path dbPath = Path.of("/home/vlofgren/Code/RemoteEnv/local/index-1/uploads/reddit/"); + + @Test + void readSubmissions() throws IOException { + if (!Files.exists(dbPath)) + return; + + try (var iter = RedditEntryReader.readSubmissions(dbPath.resolve("weightroom_submissions.zst"))) { + for (int i = 0; iter.hasNext() && i<50; ) { + var entry = iter.next(); + if (entry.selftext.length() > 1000 && entry.score > 10) { + System.out.println(entry); + i++; + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + void readComments() throws IOException { + if (!Files.exists(dbPath)) + return; + + try (var iter = RedditEntryReader.readComments(dbPath.resolve("weightroom_comments.zst"))) { + for (int i = 0; iter.hasNext() && i<50; ) { + var entry = iter.next(); + if (entry.body.length() > 1000 && entry.score > 10 && entry.parent_id.startsWith("t1")) { + System.out.println(entry); + i++; + } + } + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/code/features-convert/reddit-json/src/test/java/nu/marginalia/integration/reddit/db/RedditDbTest.java b/code/features-convert/reddit-json/src/test/java/nu/marginalia/integration/reddit/db/RedditDbTest.java new file mode 100644 index 00000000..bff2aab6 --- /dev/null +++ b/code/features-convert/reddit-json/src/test/java/nu/marginalia/integration/reddit/db/RedditDbTest.java @@ -0,0 +1,56 @@ +package nu.marginalia.integration.reddit.db; + +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.nio.file.Path; +import java.sql.SQLException; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; + +import static org.junit.jupiter.api.Assertions.*; + +class RedditDbTest { + + @Disabled + @Test + void create() throws SQLException, IOException { + RedditDb.create( + Path.of("/home/vlofgren/Exports/reddit/weightroom_submissions.zst"), + Path.of("/home/vlofgren/Exports/reddit/weightroom_comments.zst"), + Path.of("/tmp/reddit.db") + ); + } + + @Disabled + @Test + void readSubmissions() { + try (var iter = RedditDb.getSubmissions(Path.of("/home/vlofgren/Code/RemoteEnv/local/index-1/uploads/reddit/weightroom.8eda94e.db"))) { + for (int i = 0; i < 10; i++) { + System.out.println(iter.next()); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Disabled + @Test + void readComments() { + try (var iter = RedditDb.getComments(Path.of("/home/vlofgren/Code/RemoteEnv/local/index-1/uploads/reddit/weightroom.8eda94e.db"))) { + for (int i = 0; i < 10; i++) { + var entry = iter.next(); + System.out.println(iter.next()); + System.out.println(LocalDate.ofInstant(Instant.ofEpochSecond(entry.created_utc), ZoneOffset.UTC).getYear()); + } + } catch (SQLException e) { + throw new RuntimeException(e); + } catch (Exception e) { + throw new RuntimeException(e); + } + } +} \ No newline at end of file diff --git a/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/DocumentRecordKeywordsProjection.java b/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/DocumentRecordKeywordsProjection.java index 16cdf2a8..411fd13c 100644 --- a/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/DocumentRecordKeywordsProjection.java +++ b/code/process-models/processed-data/src/main/java/nu/marginalia/model/processed/DocumentRecordKeywordsProjection.java @@ -1,23 +1,15 @@ package nu.marginalia.model.processed; -import blue.strategic.parquet.Dehydrator; import blue.strategic.parquet.Hydrator; -import blue.strategic.parquet.ValueWriter; import gnu.trove.list.TLongList; import gnu.trove.list.array.TLongArrayList; import lombok.*; -import org.apache.parquet.schema.MessageType; -import org.apache.parquet.schema.Types; import org.jetbrains.annotations.NotNull; -import org.jetbrains.annotations.Nullable; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; - @Getter @Setter @NoArgsConstructor diff --git a/code/processes/converting-process/build.gradle b/code/processes/converting-process/build.gradle index 011994ea..5b1dbde5 100644 --- a/code/processes/converting-process/build.gradle +++ b/code/processes/converting-process/build.gradle @@ -53,6 +53,7 @@ dependencies { implementation project(':code:features-convert:keyword-extraction') implementation project(':code:features-convert:summary-extraction') implementation project(':code:features-convert:stackexchange-xml') + implementation project(':code:features-convert:reddit-json') implementation project(':code:features-crawl:crawl-blocklist') implementation project(':code:features-crawl:link-parser') diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java index 5e87f688..c72e284a 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/ConverterMain.java @@ -28,6 +28,7 @@ import nu.marginalia.converting.processor.DomainProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.nio.file.Path; import java.sql.SQLException; import java.util.Collection; @@ -247,6 +248,9 @@ public class ConverterMain extends ProcessMainClass { try { var request = gson.fromJson(msg.payload(), nu.marginalia.mqapi.converting.ConvertRequest.class); + // will be null on ConvertCrawlData + final Path inputPath = request.getInputPath(); + return switch (request.action) { case ConvertCrawlData -> { var crawlData = fileStorageService.getStorage(request.crawlStorage); @@ -261,7 +265,8 @@ public class ConverterMain extends ProcessMainClass { case SideloadEncyclopedia -> { var processData = fileStorageService.getStorage(request.processedDataStorage); - yield new SideloadAction(sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(Path.of(request.inputSource), request.baseUrl), + yield new SideloadAction( + sideloadSourceFactory.sideloadEncyclopediaMarginaliaNu(inputPath, request.baseUrl), processData.asPath(), msg, inbox); } @@ -269,7 +274,7 @@ public class ConverterMain extends ProcessMainClass { var processData = fileStorageService.getStorage(request.processedDataStorage); yield new SideloadAction( - sideloadSourceFactory.sideloadDirtree(Path.of(request.inputSource)), + sideloadSourceFactory.sideloadDirtree(inputPath), processData.asPath(), msg, inbox); } @@ -277,14 +282,23 @@ public class ConverterMain extends ProcessMainClass { var processData = fileStorageService.getStorage(request.processedDataStorage); yield new SideloadAction( - sideloadSourceFactory.sideloadWarc(Path.of(request.inputSource)), + sideloadSourceFactory.sideloadWarc(inputPath), + processData.asPath(), + msg, inbox); + } + case SideloadReddit -> { + var processData = fileStorageService.getStorage(request.processedDataStorage); + + yield new SideloadAction( + sideloadSourceFactory.sideloadReddit(inputPath), processData.asPath(), msg, inbox); } case SideloadStackexchange -> { var processData = fileStorageService.getStorage(request.processedDataStorage); - yield new SideloadAction(sideloadSourceFactory.sideloadStackexchange(Path.of(request.inputSource)), + yield new SideloadAction( + sideloadSourceFactory.sideloadStackexchange(inputPath), processData.asPath(), msg, inbox); } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java index 058c0eba..18cf0104 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/SideloadSourceFactory.java @@ -6,8 +6,9 @@ import nu.marginalia.atags.AnchorTextKeywords; import nu.marginalia.atags.source.AnchorTagsSourceFactory; import nu.marginalia.converting.sideload.dirtree.DirtreeSideloaderFactory; import nu.marginalia.converting.sideload.encyclopedia.EncyclopediaMarginaliaNuSideloader; +import nu.marginalia.converting.sideload.reddit.RedditSideloader; import nu.marginalia.converting.sideload.stackexchange.StackexchangeSideloader; -import nu.marginalia.converting.sideload.warc.WarcSideloadFactory; +import nu.marginalia.converting.sideload.warc.WarcSideloader; import nu.marginalia.keyword.DocumentKeywordExtractor; import nu.marginalia.language.sentence.ThreadLocalSentenceExtractorProvider; @@ -15,8 +16,10 @@ import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; import java.sql.SQLException; +import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.function.Predicate; public class SideloadSourceFactory { private final Gson gson; @@ -26,16 +29,15 @@ public class SideloadSourceFactory { private final AnchorTextKeywords anchorTextKeywords; private final AnchorTagsSourceFactory anchorTagsSourceFactory; private final DirtreeSideloaderFactory dirtreeSideloaderFactory; - private final WarcSideloadFactory warcSideloadFactory; @Inject public SideloadSourceFactory(Gson gson, SideloaderProcessing sideloaderProcessing, ThreadLocalSentenceExtractorProvider sentenceExtractorProvider, - DocumentKeywordExtractor documentKeywordExtractor, AnchorTextKeywords anchorTextKeywords, + DocumentKeywordExtractor documentKeywordExtractor, + AnchorTextKeywords anchorTextKeywords, AnchorTagsSourceFactory anchorTagsSourceFactory, - DirtreeSideloaderFactory dirtreeSideloaderFactory, - WarcSideloadFactory warcSideloadFactory) { + DirtreeSideloaderFactory dirtreeSideloaderFactory) { this.gson = gson; this.sideloaderProcessing = sideloaderProcessing; this.sentenceExtractorProvider = sentenceExtractorProvider; @@ -43,7 +45,6 @@ public class SideloadSourceFactory { this.anchorTextKeywords = anchorTextKeywords; this.anchorTagsSourceFactory = anchorTagsSourceFactory; this.dirtreeSideloaderFactory = dirtreeSideloaderFactory; - this.warcSideloadFactory = warcSideloadFactory; } public SideloadSource sideloadEncyclopediaMarginaliaNu(Path pathToDbFile, String baseUrl) throws SQLException { @@ -55,24 +56,98 @@ public class SideloadSourceFactory { } public Collection sideloadWarc(Path pathToWarcFiles) throws IOException { - return warcSideloadFactory.createSideloaders(pathToWarcFiles); + return sideload(pathToWarcFiles, + new PathSuffixPredicate(".warc", ".warc.gz"), + (Path file) -> new WarcSideloader(file, sideloaderProcessing) + ); + } + + public SideloadSource sideloadReddit(Path pathToDbFiles) throws IOException { + return sideload(pathToDbFiles, + new PathSuffixPredicate(".db"), + (List paths) -> new RedditSideloader(paths, sentenceExtractorProvider, documentKeywordExtractor)); } public Collection sideloadStackexchange(Path pathToDbFileRoot) throws IOException { - if (Files.isRegularFile(pathToDbFileRoot)) { - return List.of(new StackexchangeSideloader(pathToDbFileRoot, sentenceExtractorProvider, documentKeywordExtractor)); + return sideload(pathToDbFileRoot, + new PathSuffixPredicate(".db"), + (Path dbFile) -> new StackexchangeSideloader(dbFile, sentenceExtractorProvider, documentKeywordExtractor) + ); + } + + interface SideloadConstructorMany { + SideloadSource construct(List paths) throws IOException; + } + + interface SideloadConstructorSingle { + SideloadSource construct(Path paths) throws IOException; + } + + Collection sideload( + Path path, + Predicate pathPredicate, + SideloadConstructorSingle constructor + ) throws IOException { + if (Files.isRegularFile(path)) { + return List.of(constructor.construct(path)); } - else if (Files.isDirectory(pathToDbFileRoot)) { - try (var dirs = Files.walk(pathToDbFileRoot)) { - return dirs + else if (Files.isDirectory(path)) { + try (var dirs = Files.walk(path)) { + List sideloadSources = new ArrayList<>(); + dirs .filter(Files::isRegularFile) - .filter(f -> f.toFile().getName().endsWith(".db")) - .map(dbFile -> new StackexchangeSideloader(dbFile, sentenceExtractorProvider, documentKeywordExtractor)) - .toList(); + .filter(pathPredicate) + .forEach(file -> { + try { + sideloadSources.add(constructor.construct(file)); + } catch (IOException e) { + throw new RuntimeException(e); + } + }); + return sideloadSources; } } else { // unix socket, etc throw new IllegalArgumentException("Path to stackexchange db file(s) must be a file or directory"); } } + + SideloadSource sideload( + Path path, + Predicate pathPredicate, + SideloadConstructorMany constructor + ) throws IOException { + if (Files.isRegularFile(path)) { + return constructor.construct(List.of(path)); + } + else if (Files.isDirectory(path)) { + try (var dirs = Files.walk(path)) { + var paths = dirs + .filter(Files::isRegularFile) + .filter(pathPredicate) + .toList(); + + return constructor.construct(paths); + } + } + else { // unix socket, etc + throw new IllegalArgumentException("Path to stackexchange db file(s) must be a file or directory"); + } + } + + + private static class PathSuffixPredicate implements Predicate { + private final List endings; + + public PathSuffixPredicate(String... endings) { + this.endings = List.of(endings); + } + + @Override + public boolean test(Path path) { + String fileName = path.toFile().getName(); + + return endings.stream().anyMatch(fileName::endsWith); + } + } } diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/reddit/RedditSideloader.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/reddit/RedditSideloader.java new file mode 100644 index 00000000..97e519d6 --- /dev/null +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/reddit/RedditSideloader.java @@ -0,0 +1,195 @@ +package nu.marginalia.converting.sideload.reddit; + +import nu.marginalia.atags.model.DomainLinks; +import nu.marginalia.converting.model.GeneratorType; +import nu.marginalia.converting.model.ProcessedDocument; +import nu.marginalia.converting.model.ProcessedDocumentDetails; +import nu.marginalia.converting.model.ProcessedDomain; +import nu.marginalia.converting.sideload.SideloadSource; +import nu.marginalia.integration.reddit.db.RedditDb; +import nu.marginalia.keyword.DocumentKeywordExtractor; +import nu.marginalia.language.sentence.ThreadLocalSentenceExtractorProvider; +import nu.marginalia.model.EdgeDomain; +import nu.marginalia.model.EdgeUrl; +import nu.marginalia.model.crawl.DomainIndexingState; +import nu.marginalia.model.crawl.HtmlFeature; +import nu.marginalia.model.crawl.PubDate; +import nu.marginalia.model.crawl.UrlIndexingState; +import nu.marginalia.model.html.HtmlStandard; +import nu.marginalia.model.idx.DocumentFlags; +import nu.marginalia.model.idx.DocumentMetadata; +import nu.marginalia.model.idx.WordFlags; +import nu.marginalia.util.ProcessingIterator; +import org.apache.commons.lang3.StringUtils; +import org.jsoup.Jsoup; + +import java.net.URISyntaxException; +import java.nio.file.Path; +import java.time.Instant; +import java.time.LocalDate; +import java.time.ZoneOffset; +import java.util.EnumSet; +import java.util.Iterator; +import java.util.List; + +public class RedditSideloader implements SideloadSource { + private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RedditSideloader.class); + + private final List dbFiles; + private final ThreadLocalSentenceExtractorProvider sentenceExtractorProvider; + private final DocumentKeywordExtractor keywordExtractor; + + public RedditSideloader(List listToDbFiles, + ThreadLocalSentenceExtractorProvider sentenceExtractorProvider, + DocumentKeywordExtractor keywordExtractor) { + this.dbFiles = listToDbFiles; + this.sentenceExtractorProvider = sentenceExtractorProvider; + this.keywordExtractor = keywordExtractor; + } + + @Override + public ProcessedDomain getDomain() { + var ret = new ProcessedDomain(); + + ret.domain = new EdgeDomain("old.reddit.com"); + ret.ip = "127.0.0.1"; + ret.state = DomainIndexingState.ACTIVE; + + ret.sizeloadSizeAdvice = 5_000_000; + + return ret; + } + + @Override + public Iterator getDocumentsStream() { + return ProcessingIterator.factory(24, 16).create((taskConsumer) -> { + DomainLinks domainLinks = new DomainLinks(); + + for (var dbFile : dbFiles) { + try (var submissions = RedditDb.getSubmissions(dbFile)) { + while (submissions.hasNext()) { + var entry = submissions.next(); + taskConsumer.accept(() -> + convertDocument(entry.selftext, + entry.subreddit, + entry.title, + entry.author, + entry.permalink, + entry.created_utc, + entry.score, + domainLinks) + ); + } + } catch (Exception e) { + logger.error("Error reading db file", e); + } + + try (var comments = RedditDb.getComments(dbFile)) { + while (comments.hasNext()) { + var entry = comments.next(); + taskConsumer.accept(() -> + convertDocument(entry.body, + entry.subreddit, + entry.title, + entry.author, + entry.permalink, + entry.created_utc, + entry.score, + domainLinks) + ); + } + } catch (Exception e) { + logger.error("Error reading db file", e); + } + } + }); + } + + private ProcessedDocument convertDocument(String body, + String subreddit, + String title, + String author, + String permalink, + int createdUtc, + int score, + DomainLinks domainLinks) throws URISyntaxException { + String fullUrl = "https://old.reddit.com" + permalink; + + StringBuilder fullHtml = new StringBuilder(); + fullHtml.append("").append(title).append(""); + fullHtml.append("

").append(title).append("

"); + fullHtml.append("

").append(body).append("

"); + fullHtml.append(""); + + var ret = new ProcessedDocument(); + try { + + var url = new EdgeUrl(fullUrl); + var doc = Jsoup.parse(fullHtml.toString()); + var dld = sentenceExtractorProvider.get().extractSentences(doc); + + ret.url = url; + ret.words = keywordExtractor.extractKeywords(dld, url); + + ret.words.addAllSyntheticTerms(List.of( + "js:true", + "site:reddit.com", + "site:old.reddit.com", + "site:www.reddit.com", + "special:ads", + "special:tracking", + "generator:forum", + subreddit + )); + + ret.words.add(subreddit, WordFlags.Subjects.asBit()); + ret.words.add("reddit", + WordFlags.ExternalLink.asBit() + | WordFlags.Subjects.asBit() + | WordFlags.Synthetic.asBit() + | WordFlags.NamesWords.asBit()); + ret.words.add(subreddit.toLowerCase(), + WordFlags.ExternalLink.asBit() + | WordFlags.NamesWords.asBit() + | WordFlags.Synthetic.asBit() + ); + if (!"[deleted]".equals(author)) + ret.words.add(author, WordFlags.NamesWords.asBit() | WordFlags.Synthetic.asBit()); + + var date = LocalDate.ofInstant( + Instant.ofEpochSecond(createdUtc), + ZoneOffset.UTC); + int year = date.getYear(); + + ret.details = new ProcessedDocumentDetails(); + ret.details.pubYear = year; + ret.details.quality = -5; + ret.details.metadata = new DocumentMetadata(3, + PubDate.toYearByte(year), + (int) -ret.details.quality, + EnumSet.of(DocumentFlags.GeneratorForum)); + ret.details.features = EnumSet.of(HtmlFeature.JS, HtmlFeature.TRACKING); + + ret.details.metadata.withSizeAndTopology(10000, score); + + ret.details.generator = GeneratorType.DOCS; + ret.details.title = StringUtils.truncate(STR."[/r/\{subreddit}] \{title}", 128); + ret.details.description = StringUtils.truncate(body, 255); + ret.details.length = 128; + + ret.details.standard = HtmlStandard.HTML5; + ret.details.feedLinks = List.of(); + ret.details.linksExternal = List.of(); + ret.details.linksInternal = List.of(); + ret.state = UrlIndexingState.OK; + ret.stateReason = "SIDELOAD"; + } + catch (Exception e) { + logger.warn("Failed to process document", e); + ret.url = new EdgeUrl(fullUrl); + ret.state = UrlIndexingState.DISQUALIFIED; + ret.stateReason = "SIDELOAD"; + } + return ret; + }; +} diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloadFactory.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloadFactory.java deleted file mode 100644 index 5ba88432..00000000 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloadFactory.java +++ /dev/null @@ -1,57 +0,0 @@ -package nu.marginalia.converting.sideload.warc; - -import com.google.inject.Inject; -import nu.marginalia.converting.sideload.SideloadSource; -import nu.marginalia.converting.sideload.SideloaderProcessing; - -import java.io.IOException; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; - -public class WarcSideloadFactory { - - private final SideloaderProcessing processing; - - @Inject - public WarcSideloadFactory(SideloaderProcessing processing) { - this.processing = processing; - } - - public Collection createSideloaders(Path pathToWarcFiles) throws IOException { - - if (Files.isRegularFile(pathToWarcFiles)) { - return List.of(new WarcSideloader(pathToWarcFiles, processing)); - } - else if (Files.isDirectory(pathToWarcFiles)) { - - final List files = new ArrayList<>(); - - try (var stream = Files.list(pathToWarcFiles)) { - stream - .filter(Files::isRegularFile) - .filter(this::isWarcFile) - .forEach(files::add); - - } - - List sources = new ArrayList<>(); - - for (Path file : files) { - sources.add(new WarcSideloader(file, processing)); - } - - return sources; - } - else { - throw new IllegalArgumentException("Path " + pathToWarcFiles + " is neither a file nor a directory"); - } - } - - private boolean isWarcFile(Path path) { - return path.toString().endsWith(".warc") - || path.toString().endsWith(".warc.gz"); - } -} \ No newline at end of file diff --git a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloader.java b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloader.java index 97406ff0..edb670fa 100644 --- a/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloader.java +++ b/code/processes/converting-process/src/main/java/nu/marginalia/converting/sideload/warc/WarcSideloader.java @@ -35,9 +35,9 @@ public class WarcSideloader implements SideloadSource, AutoCloseable { private final EdgeDomain domain; + @SneakyThrows public WarcSideloader(Path warcFile, SideloaderProcessing sideloaderProcessing) - throws IOException { this.sideloaderProcessing = sideloaderProcessing; this.reader = new WarcReader(warcFile); diff --git a/code/processes/converting-process/src/test/java/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java b/code/processes/converting-process/src/test/java/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java new file mode 100644 index 00000000..5d7e25b4 --- /dev/null +++ b/code/processes/converting-process/src/test/java/nu/marginalia/converting/sideload/reddit/RedditSideloaderTest.java @@ -0,0 +1,55 @@ +package nu.marginalia.converting.sideload.reddit; + +import com.google.inject.AbstractModule; +import com.google.inject.Guice; +import nu.marginalia.ProcessConfiguration; +import nu.marginalia.converting.ConverterModule; +import nu.marginalia.converting.processor.ConverterDomainTypes; +import nu.marginalia.converting.sideload.SideloadSourceFactory; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.UUID; + +import static org.mockito.Mockito.when; + +class RedditSideloaderTest extends AbstractModule { + /* This test case exists for debugging, to get deep into the Reddit sideloader and see if it can read the files. + * Update the path to the Reddit database in the dbPath variable. + * */ + private static final Path dbPath = Path.of("/home/vlofgren/Code/RemoteEnv/local/index-1/uploads/reddit/"); + + private SideloadSourceFactory sourceFactory; + @BeforeEach + public void setUp() throws IOException { + sourceFactory = Guice.createInjector(new ConverterModule(), this) + .getInstance(SideloadSourceFactory.class); + } + + public void configure() { + var domainTypesMock = Mockito.mock(ConverterDomainTypes.class); + when(domainTypesMock.isBlog(Mockito.any())).thenReturn(false); + bind(ProcessConfiguration.class).toInstance(new ProcessConfiguration("test", 1, UUID.randomUUID())); + bind(ConverterDomainTypes.class).toInstance(domainTypesMock); + } + + @Test + void getDocumentsStream() throws IOException { + if (Files.notExists(dbPath)) { + return; + } + + var sideloader = sourceFactory.sideloadReddit(dbPath); + + sideloader.getDomain(); + var stream = sideloader.getDocumentsStream(); + for (int i = 0; i < 10; i++) { + var next = stream.next(); + System.out.println(next); + } + } +} \ No newline at end of file diff --git a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java index cf8acaf4..05de9b07 100644 --- a/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java +++ b/code/services-core/control-service/src/main/java/nu/marginalia/control/node/svc/ControlNodeActionsService.java @@ -58,6 +58,9 @@ public class ControlNodeActionsService { Spark.post("/public/nodes/:node/actions/sideload-dirtree", this::sideloadDirtree, redirectControl.renderRedirectAcknowledgement("Sideloading", "..") ); + Spark.post("/public/nodes/:node/actions/sideload-reddit", this::sideloadReddit, + redirectControl.renderRedirectAcknowledgement("Sideloading", "..") + ); Spark.post("/public/nodes/:node/actions/sideload-warc", this::sideloadWarc, redirectControl.renderRedirectAcknowledgement("Sideloading", "..") ); @@ -141,7 +144,18 @@ public class ControlNodeActionsService { return ""; } + public Object sideloadReddit(Request request, Response response) { + final int nodeId = Integer.parseInt(request.params("node")); + + Path sourcePath = parseSourcePath(request.queryParams("source")); + + eventLog.logEvent("USER-ACTION", "SIDELOAD REDDIT " + nodeId); + + executorClient.sideloadReddit(nodeId, sourcePath); + + return ""; + } public Object sideloadWarc(Request request, Response response) { final int nodeId = Integer.parseInt(request.params("node")); diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-reddit.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-reddit.hdb new file mode 100644 index 00000000..764be316 --- /dev/null +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-reddit.hdb @@ -0,0 +1,36 @@ +

Sideload Reddit

+ +
+This will index a pushshift.io dump from the Reddit API into index. +
+
+
+ + + {{#each uploadDirContents.items}} + + + + + + + {{/each}} + {{#unless uploadDirContents.items}} + + + + {{/unless}} +
FilenameSizeLast Modified
+ + {{#unless directory}}{{size}}{{/unless}}{{lastModifiedTime}}
Nothing found in upload directory
+ +

+ + The upload directory is typically mounted to /uploads on the server. The external + directory is typically something like index-{{node.id}}/uploads. + +

+ + +
+
\ No newline at end of file diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/node-actions.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/node-actions.hdb index ea4502fa..df8ed77f 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/node-actions.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/node-actions.hdb @@ -19,6 +19,7 @@ {{#if view.sideload-stackexchange}} {{> control/node/actions/partial-sideload-stackexchange }} {{/if}} {{#if view.sideload-warc}} {{> control/node/actions/partial-sideload-warc }} {{/if}} {{#if view.sideload-dirtree}} {{> control/node/actions/partial-sideload-dirtree }} {{/if}} + {{#if view.sideload-reddit}} {{> control/node/actions/partial-sideload-reddit }} {{/if}} {{#if view.export-db-data}} {{> control/node/actions/partial-export-db-data }} {{/if}} {{#if view.export-from-crawl-data}} {{> control/node/actions/partial-export-from-crawl-data }} {{/if}} {{#if view.export-sample-data}} {{> control/node/actions/partial-export-sample-data }} {{/if}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/partial-node-nav.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/partial-node-nav.hdb index de963ea3..23627155 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/partial-node-nav.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/partial-node-nav.hdb @@ -24,6 +24,7 @@
  • Sideload Stackexchange
  • Sideload WARC Files
  • Sideload Dirtree
  • +
  • Sideload Reddit
  • Download Sample Crawl Data
  • Export Database Data
  • diff --git a/code/services-core/executor-service/build.gradle b/code/services-core/executor-service/build.gradle index e903d048..f2d9678d 100644 --- a/code/services-core/executor-service/build.gradle +++ b/code/services-core/executor-service/build.gradle @@ -41,6 +41,7 @@ dependencies { implementation project(':code:features-crawl:link-parser') implementation project(':code:features-convert:data-extractors') implementation project(':code:features-convert:stackexchange-xml') + implementation project(':code:features-convert:reddit-json') implementation project(':code:features-index:index-journal') implementation project(':code:api:index-api') implementation project(':code:api:query-api') diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertActor.java b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertActor.java index aed6d05a..ee9fdba0 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertActor.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/actor/task/ConvertActor.java @@ -10,6 +10,7 @@ import nu.marginalia.actor.state.Resume; import nu.marginalia.encyclopedia.EncyclopediaConverter; import nu.marginalia.process.ProcessOutboxes; import nu.marginalia.process.ProcessService; +import nu.marginalia.sideload.RedditSideloadHelper; import nu.marginalia.sideload.SideloadHelper; import nu.marginalia.sideload.StackExchangeSideloadHelper; import nu.marginalia.storage.FileStorageService; @@ -38,6 +39,7 @@ public class ConvertActor extends RecordActorPrototype { public record PredigestEncyclopedia(String source, String dest, String baseUrl) implements ActorStep {}; public record ConvertDirtree(String source) implements ActorStep {}; public record ConvertWarc(String source) implements ActorStep {}; + public record ConvertReddit(String source) implements ActorStep {}; public record ConvertStackexchange(String source) implements ActorStep {}; @Resume(behavior = ActorResumeBehavior.RETRY) public record ConvertWait(FileStorageId destFid, @@ -96,6 +98,28 @@ public class ConvertActor extends RecordActorPrototype { mqConverterOutbox.sendAsync(ConvertRequest.forWarc(sourcePath, processedArea.id())) ); } + case ConvertReddit(String source) -> { + Path sourcePath = Path.of(source); + if (!Files.exists(sourcePath)) + yield new Error("Source path does not exist: " + sourcePath); + + String fileName = sourcePath.toFile().getName(); + + var processedArea = storageService.allocateStorage( + FileStorageType.PROCESSED_DATA, "processed-data", + "Processed Reddit Data; " + fileName); + + storageService.setFileStorageState(processedArea.id(), FileStorageState.NEW); + + // Convert reddit data to sqlite database + // (we can't use a Predigest- step here because the conversion is too complicated) + RedditSideloadHelper.convertRedditData(sourcePath); + + yield new ConvertWait( + processedArea.id(), + mqConverterOutbox.sendAsync(ConvertRequest.forReddit(sourcePath, processedArea.id())) + ); + } case ConvertEncyclopedia(String source, String baseUrl) -> { Path sourcePath = Path.of(source); diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java index 0816c51b..e0969196 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/ExecutorGrpcService.java @@ -162,6 +162,18 @@ public class ExecutorGrpcService extends ExecutorApiGrpc.ExecutorApiImplBase { } } + @Override + public void sideloadReddit(RpcSideloadReddit request, StreamObserver responseObserver) { + try { + sideloadService.sideloadReddit(request); + responseObserver.onNext(Empty.getDefaultInstance()); + responseObserver.onCompleted(); + } + catch (Exception e) { + responseObserver.onError(e); + } + } + @Override public void sideloadWarc(RpcSideloadWarc request, StreamObserver responseObserver) { try { diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java index 7daefe6e..eae236c6 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java @@ -32,6 +32,12 @@ public class SideloadService { ); } + public void sideloadReddit(RpcSideloadReddit request) throws Exception { + actorControlService.startFrom(ExecutorActor.CONVERT, + new ConvertActor.ConvertReddit(request.getSourcePath()) + ); + } + public void sideloadWarc(RpcSideloadWarc request) throws Exception { actorControlService.startFrom(ExecutorActor.CONVERT, new ConvertActor.ConvertWarc(request.getSourcePath()) diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/sideload/RedditSideloadHelper.java b/code/services-core/executor-service/src/main/java/nu/marginalia/sideload/RedditSideloadHelper.java new file mode 100644 index 00000000..23d7e544 --- /dev/null +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/sideload/RedditSideloadHelper.java @@ -0,0 +1,104 @@ +package nu.marginalia.sideload; + +import nu.marginalia.integration.reddit.db.RedditDb; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardCopyOption; +import java.util.*; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ForkJoinPool; +import java.util.stream.Collectors; + +/** Contains helper functions for pre-converting stackexchange style 7z + * files to marginalia-digestible sqlite databases*/ +public class RedditSideloadHelper { + private static final Logger logger = LoggerFactory.getLogger(RedditSideloadHelper.class); + + /** Looks for stackexchange 7z files in the given path and converts them to sqlite databases. + * The function is idempotent, so it is safe to call it multiple times on the same path + * (it will not re-convert files that have already been successfully converted) + * */ + public static void convertRedditData(Path sourcePath) { + if (!Files.isDirectory(sourcePath)) { + throw new UnsupportedOperationException("RedditSideloadHelper.convertRedditData only supports directories"); + } + + Set allFileNames; + try (var contents = Files.list(sourcePath)) { + allFileNames = contents.filter(Files::isRegularFile) + .map(Path::toFile) + .map(File::getName) + .filter(name -> name.endsWith(".zst")) + .sorted() + .collect(Collectors.toCollection(TreeSet::new)); + } catch (IOException ex) { + logger.warn("Failed to convert reddit zstd file to sqlite database", ex); + return; + } + + int parallelism = Math.clamp(ForkJoinPool.getCommonPoolParallelism(), 1, Runtime.getRuntime().availableProcessors() / 2); + try (var executor = Executors.newWorkStealingPool(parallelism)) + { + for (var fileName : allFileNames) { + if (!fileName.endsWith(RedditFilePair.submissionsSuffix)) { + continue; + } + + String baseName = fileName.substring(0, fileName.length() - RedditFilePair.submissionsSuffix.length()); + String commentsFileName = baseName + RedditFilePair.commentsSuffix; + + if (!allFileNames.contains(commentsFileName)) { + logger.warn("Skipping reddit file pair {} because it is missing the comments file", fileName); + return; + } + + executor.submit(() -> convertSingleRedditFile(new RedditFilePair(sourcePath, baseName))); + } + } + } + + record RedditFilePair(Path rootDir, String fileNameBase) { + static String submissionsSuffix = "_submissions.zst"; + static String commentsSuffix = "_comments.zst"; + + public String submissionsFileName() { return fileNameBase + submissionsSuffix; } + public String commentsFileName() { return fileNameBase + commentsSuffix; } + + public Path submissionsPath() { return rootDir.resolve(submissionsFileName()); } + public Path commentsPath() { return rootDir.resolve(commentsFileName()); } + } + + private static void convertSingleRedditFile(RedditFilePair files) { + try { + Path destPath = getRedditDbPath(files); + if (Files.exists(destPath)) // already converted + return; + + Path tempFile = Files.createTempFile(destPath.getParent(), "processed", "db.tmp"); + try { + logger.info("Converting reddit zstd file {} to sqlite database", files.fileNameBase); + RedditDb.create(files.submissionsPath(), files.commentsPath(), tempFile); + logger.info("Finished converting reddit zstd file {} to sqlite database", files.fileNameBase); + Files.move(tempFile, destPath, StandardCopyOption.REPLACE_EXISTING); + } catch (Exception e) { + logger.error("Failed to convert reddit zstd file to sqlite database", e); + Files.deleteIfExists(tempFile); + Files.deleteIfExists(destPath); + } + } catch (IOException ex) { + logger.warn("Failed to convert reddit zstd file to sqlite database", ex); + } + } + + private static Path getRedditDbPath(RedditFilePair pair) throws IOException { + String hash = SideloadHelper.getCrc32FileHash(pair.commentsPath()); + return pair.rootDir().resolve(STR."\{pair.fileNameBase}.\{hash}.db"); + } + +} \ No newline at end of file diff --git a/settings.gradle b/settings.gradle index dcec22b4..af0cad4f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -37,6 +37,7 @@ include 'code:features-convert:adblock' include 'code:features-convert:anchor-keywords' include 'code:features-convert:data-extractors' include 'code:features-convert:stackexchange-xml' +include 'code:features-convert:reddit-json' include 'code:features-convert:pubdate' include 'code:features-convert:summary-extraction' include 'code:features-convert:keyword-extraction' From 8f91156d80c802831b0e6ea2b32e737a908cf791 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Wed, 14 Feb 2024 18:38:20 +0100 Subject: [PATCH 2/3] (control) Improve sideload UX The sideload forms didn't properly set the label 'for' property, meaning that while label tags existed, they weren't appropriately clickable. Also removed unnecessary limits on the sideload target being a directory for stackexchange and warc. It's been possible to directly load a particular file for a while, but not allowed due to GUI limits. --- .../control/node/actions/partial-sideload-dirtree.hdb | 2 +- .../control/node/actions/partial-sideload-encyclopedia.hdb | 2 +- .../control/node/actions/partial-sideload-stackexchange.hdb | 6 +++--- .../control/node/actions/partial-sideload-warc.hdb | 6 +++--- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-dirtree.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-dirtree.hdb index ac65e966..bcedeaa7 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-dirtree.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-dirtree.hdb @@ -12,7 +12,7 @@ for more information on how to set this up. - + {{#unless directory}}{{size}}{{/unless}} {{lastModifiedTime}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-encyclopedia.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-encyclopedia.hdb index 62bbce9a..124f8a19 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-encyclopedia.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-encyclopedia.hdb @@ -22,7 +22,7 @@ - + {{#unless directory}}{{size}}{{/unless}} {{lastModifiedTime}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-stackexchange.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-stackexchange.hdb index 701fbb51..8297ae68 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-stackexchange.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-stackexchange.hdb @@ -13,9 +13,9 @@ information how to do this. FilenameSizeLast Modified {{#each uploadDirContents.items}} - - - + + + {{#unless directory}}{{size}}{{/unless}} {{lastModifiedTime}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-warc.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-warc.hdb index 587c02c0..7871ca54 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-warc.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-warc.hdb @@ -12,9 +12,9 @@ A warc export can be created using e.g. wget:

    FilenameSizeLast Modified {{#each uploadDirContents.items}} - - - + + + {{#unless directory}}{{size}}{{/unless}} {{lastModifiedTime}} From 8021bd0aaefc96d166fa2210b54a4719cb91b8f2 Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Thu, 15 Feb 2024 09:13:40 +0100 Subject: [PATCH 3/3] (control) Sort upload listing results Improve the UX of the sideload GUI by sorting the results in a sensible fashion, first by whether it's a directory, then by its filename. The change also changes the timestamp rendering to a more human-readable format than full ISO-8601. --- .../control/node/actions/partial-sideload-dirtree.hdb | 2 +- .../control/node/actions/partial-sideload-encyclopedia.hdb | 2 +- .../control/node/actions/partial-sideload-reddit.hdb | 4 ++-- .../control/node/actions/partial-sideload-stackexchange.hdb | 2 +- .../control/node/actions/partial-sideload-warc.hdb | 2 +- .../java/nu/marginalia/executor/svc/SideloadService.java | 6 +++++- 6 files changed, 11 insertions(+), 7 deletions(-) diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-dirtree.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-dirtree.hdb index bcedeaa7..b620bfba 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-dirtree.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-dirtree.hdb @@ -15,7 +15,7 @@ for more information on how to set this up. {{#unless directory}}{{size}}{{/unless}} - {{lastModifiedTime}} + {{shortTimestamp lastModifiedTime}} {{/each}} {{#unless uploadDirContents.items}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-encyclopedia.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-encyclopedia.hdb index 124f8a19..39d5c686 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-encyclopedia.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-encyclopedia.hdb @@ -25,7 +25,7 @@ {{#unless directory}}{{size}}{{/unless}} - {{lastModifiedTime}} + {{shortTimestamp lastModifiedTime}} {{/each}} {{#unless uploadDirContents.items}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-reddit.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-reddit.hdb index 764be316..bf9f36b3 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-reddit.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-reddit.hdb @@ -1,7 +1,7 @@

    Sideload Reddit

    -This will index a pushshift.io dump from the Reddit API into index. +This will index a pushshift.io "top-n subreddits" dump from the Reddit API into index.
    @@ -14,7 +14,7 @@ This will index a pushshift.io dump from the Reddit API into index. {{#unless directory}}{{size}}{{/unless}} - {{lastModifiedTime}} + {{shortTimestamp lastModifiedTime}} {{/each}} {{#unless uploadDirContents.items}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-stackexchange.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-stackexchange.hdb index 8297ae68..f5f73f84 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-stackexchange.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-stackexchange.hdb @@ -18,7 +18,7 @@ information how to do this. {{#unless directory}}{{size}}{{/unless}} - {{lastModifiedTime}} + {{shortTimestamp lastModifiedTime}} {{/each}} {{#unless uploadDirContents.items}} diff --git a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-warc.hdb b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-warc.hdb index 7871ca54..7680b7b8 100644 --- a/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-warc.hdb +++ b/code/services-core/control-service/src/main/resources/templates/control/node/actions/partial-sideload-warc.hdb @@ -17,7 +17,7 @@ A warc export can be created using e.g. wget:

    {{#unless directory}}{{size}}{{/unless}} - {{lastModifiedTime}} + {{shortTimestamp lastModifiedTime}} {{/each}} {{#unless uploadDirContents.items}} diff --git a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java index eae236c6..136b6f1d 100644 --- a/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java +++ b/code/services-core/executor-service/src/main/java/nu/marginalia/executor/svc/SideloadService.java @@ -16,6 +16,7 @@ import java.nio.file.Path; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; +import java.util.Comparator; public class SideloadService { private final ExecutorActorControlService actorControlService; @@ -61,7 +62,10 @@ public class SideloadService { public RpcUploadDirContents listUploadDir() throws IOException { Path uploadDir = WmsaHome.getUploadDir(); - try (var items = Files.list(uploadDir)) { + try (var items = Files.list(uploadDir).sorted( + Comparator.comparing((Path d) -> Files.isDirectory(d)).reversed() + .thenComparing(path -> path.getFileName().toString()) + )) { var builder = RpcUploadDirContents.newBuilder().setPath(uploadDir.toString()); var iter = items.iterator();