Merge pull request #48 from MarginaliaSearch/parquet

Converter-Loader communicates via Parquet files
This commit is contained in:
Viktor 2023-09-15 13:32:06 +02:00 committed by GitHub
commit 46232c7fd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
148 changed files with 4648 additions and 2075 deletions

View File

@ -22,10 +22,13 @@ To set up a local test environment, follow the instructions in [📄 run/readme.
## Hardware Requirements
A production-like environment requires at least 128 Gb of RAM and ideally 2 Tb+ of enterprise
grade SSD storage, as well as some additional terabytes of slower harddrives for storing crawl
A production-like environment requires a lot of RAM and ideally enterprise SSDs for
the index, as well as some additional terabytes of slower harddrives for storing crawl
data. It can be made to run on smaller hardware by limiting size of the index.
The system will definitely run on a 32 Gb machine, possibly smaller, but at that size it may not perform
very well as it relies on disk caching to be fast.
A local developer's deployment is possible with much smaller hardware (and index size).
## Project Structure

View File

@ -0,0 +1,11 @@
The link database contains information about links,
such as their ID, their URL, their title, their description,
and so forth.
The link database is a sqlite file. The reason this information
is not in the MariaDB database is that this would make updates to
this information take effect in production immediately, even before
the information was searchable.
It is constructed by the [loading-process](../../processes/loading-process), and consumed
by the [search-service](../../services-core/search-service).

View File

@ -1,6 +1,6 @@
package nu.marginalia.converting.model;
package nu.marginalia.model.html;
// This class really doesn't belong anywhere, but will squat here for now
public enum HtmlStandard {
PLAIN(0, 1),
UNKNOWN(0, 1),

View File

@ -15,7 +15,6 @@ java {
dependencies {
implementation project(':code:common:model')
implementation project(':code:process-models:converting-model')
implementation libs.lombok
annotationProcessor libs.lombok

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
public class PubDateFromHtmlStandard {
/** Used to bias pub date heuristics */

View File

@ -2,7 +2,7 @@ package nu.marginalia.pubdate;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import org.jsoup.nodes.Document;
import java.util.Optional;

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.crawl.PubDate;
import java.time.DateTimeException;

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.pubdate.heuristic.*;

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate.heuristic;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.pubdate.PubDateHeuristic;
import nu.marginalia.pubdate.PubDateParser;

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate.heuristic;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.pubdate.PubDateEffortLevel;
import nu.marginalia.pubdate.PubDateFromHtmlStandard;

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate.heuristic;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.pubdate.PubDateEffortLevel;
import nu.marginalia.pubdate.PubDateHeuristic;

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate.heuristic;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.pubdate.PubDateHeuristic;
import nu.marginalia.pubdate.PubDateParser;

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate.heuristic;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.pubdate.PubDateHeuristic;
import nu.marginalia.pubdate.PubDateParser;

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate.heuristic;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.pubdate.PubDateHeuristic;
import nu.marginalia.pubdate.PubDateParser;

View File

@ -5,7 +5,7 @@ import com.google.gson.GsonBuilder;
import com.google.gson.JsonSyntaxException;
import com.google.gson.annotations.SerializedName;
import lombok.ToString;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.pubdate.PubDateHeuristic;
import nu.marginalia.pubdate.PubDateParser;

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate.heuristic;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.pubdate.PubDateHeuristic;
import nu.marginalia.pubdate.PubDateParser;

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate.heuristic;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.pubdate.PubDateHeuristic;

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate.heuristic;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.pubdate.PubDateEffortLevel;
import nu.marginalia.pubdate.PubDateHeuristic;

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate.heuristic;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.pubdate.PubDateEffortLevel;
import nu.marginalia.pubdate.PubDateHeuristic;

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate.heuristic;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.pubdate.PubDateHeuristic;
import nu.marginalia.pubdate.PubDateParser;

View File

@ -1,6 +1,6 @@
package nu.marginalia.pubdate.heuristic;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.pubdate.PubDateHeuristic;
import nu.marginalia.pubdate.PubDateParser;

View File

@ -2,7 +2,7 @@ package nu.marginalia.pubdate;
import nu.marginalia.WmsaHome;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.pubdate.heuristic.PubDateHeuristicDOMParsingPass2;
import org.jsoup.Jsoup;
import org.junit.jupiter.api.Test;

View File

@ -1,49 +0,0 @@
# Converting Models
Contains models shared by the [converting-process](../../processes/converting-process/) and
[loading-process](../../processes/loading-process/).
## Design
The two processes communicate through a file-based protocol. The converter serializes [instructions](src/main/java/nu/marginalia/converting/instruction/Instruction.java)
to file, which are deserialized by the loader and fed into an [instructions](src/main/java/nu/marginalia/converting/instruction/Interpreter.java).
The instructions implement a visitor pattern.
Conceptually the pattern can be thought of a bit like remote function calls over file,
or a crude instructions-based programming language.
This
```java
producer.foo("cat");
producer.bar("milk", "eggs", "bread");
```
translates through this paradigm, to this:
```
(producer)
writeInstruction(DoFoo("Cat"))
writeInstruction(DoBar("Milk", "Eggs", "Bread"))
(consumer)
while read instruction:
interpreter.apply(instruction)
(Interpreter)
doFoo(animal):
...
doBar(ingredients):
...
(doFoo)
DoFoo(animal):
apply(interpreter):
interpreter.foo(animal)
(doBar)
DoBar(ingredients):
apply(interpreter):
interpreter.bar(ingredients)
```

View File

@ -1,10 +0,0 @@
package nu.marginalia.converting.instruction;
import java.io.Serializable;
public interface Instruction extends Serializable {
void apply(Interpreter interpreter);
boolean isNoOp();
InstructionTag tag();
}

View File

@ -1,25 +0,0 @@
package nu.marginalia.converting.instruction;
import nu.marginalia.converting.instruction.instructions.*;
public enum InstructionTag {
DOMAIN(LoadDomain.class),
LINK(LoadDomainLink.class),
REDIRECT(LoadDomainRedirect.class),
WORDS(LoadKeywords.class),
PROC_DOCUMENT(LoadProcessedDocument.class),
PROC_DOCUMENT_ERR(LoadProcessedDocumentWithError.class),
PROC_DOMAIN(LoadProcessedDomain.class),
DOMAIN_METADATA(LoadDomainMetadata.class),
RSS(LoadRssFeed.class);
public final Class<? extends Instruction> clazz;
InstructionTag(Class<? extends Instruction> clazz) {
this.clazz = clazz;
}
}

View File

@ -1,26 +0,0 @@
package nu.marginalia.converting.instruction;
import nu.marginalia.keyword.model.DocumentKeywords;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawl.DomainIndexingState;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.converting.instruction.instructions.DomainLink;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError;
public interface Interpreter {
default void loadDomain(EdgeDomain[] domain) {}
default void loadRssFeed(EdgeUrl[] rssFeed) {}
default void loadDomainLink(DomainLink[] links) {}
default void loadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip) {}
default void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument) {}
default void loadProcessedDocumentWithError(LoadProcessedDocumentWithError loadProcessedDocumentWithError) {}
default void loadKeywords(EdgeUrl url, int ordinal, int features, DocumentMetadata metadata, DocumentKeywords words) {}
default void loadDomainRedirect(DomainLink link) {}
default void loadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls) {}
}

View File

@ -1,8 +0,0 @@
package nu.marginalia.converting.instruction.instructions;
import nu.marginalia.model.EdgeDomain;
import java.io.Serializable;
public record DomainLink(EdgeDomain from, EdgeDomain to) implements Serializable {
}

View File

@ -1,31 +0,0 @@
package nu.marginalia.converting.instruction.instructions;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.InstructionTag;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.model.EdgeDomain;
import java.util.Arrays;
public record LoadDomain(EdgeDomain... domain) implements Instruction {
@Override
public void apply(Interpreter interpreter) {
interpreter.loadDomain(domain);
}
@Override
public boolean isNoOp() {
return domain.length == 0;
}
@Override
public InstructionTag tag() {
return InstructionTag.DOMAIN;
}
@Override
public String toString() {
return getClass().getSimpleName()+"["+Arrays.toString(domain)+"]";
}
}

View File

@ -1,31 +0,0 @@
package nu.marginalia.converting.instruction.instructions;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.InstructionTag;
import nu.marginalia.converting.instruction.Interpreter;
import java.util.Arrays;
public record LoadDomainLink(DomainLink... links) implements Instruction {
@Override
public void apply(Interpreter interpreter) {
interpreter.loadDomainLink(links);
}
@Override
public String toString() {
return getClass().getSimpleName()+"["+ Arrays.toString(links)+"]";
}
@Override
public InstructionTag tag() {
return InstructionTag.LINK;
}
@Override
public boolean isNoOp() {
return links.length == 0;
}
}

View File

@ -1,28 +0,0 @@
package nu.marginalia.converting.instruction.instructions;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.InstructionTag;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import java.util.Arrays;
public record LoadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls) implements Instruction {
@Override
public void apply(Interpreter interpreter) {
interpreter.loadDomainMetadata(domain, knownUrls, goodUrls, visitedUrls);
}
@Override
public boolean isNoOp() {
return false;
}
@Override
public InstructionTag tag() {
return InstructionTag.DOMAIN_METADATA;
}
}

View File

@ -1,29 +0,0 @@
package nu.marginalia.converting.instruction.instructions;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.InstructionTag;
import nu.marginalia.converting.instruction.Interpreter;
public record LoadDomainRedirect(DomainLink links) implements Instruction {
@Override
public void apply(Interpreter interpreter) {
interpreter.loadDomainRedirect(links);
}
@Override
public String toString() {
return getClass().getSimpleName()+"["+ links+"]";
}
@Override
public InstructionTag tag() {
return InstructionTag.REDIRECT;
}
@Override
public boolean isNoOp() {
return false;
}
}

View File

@ -1,32 +0,0 @@
package nu.marginalia.converting.instruction.instructions;
import nu.marginalia.keyword.model.DocumentKeywords;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.InstructionTag;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.model.EdgeUrl;
public record LoadKeywords(EdgeUrl url, int ordinal, int features, DocumentMetadata metadata, DocumentKeywords words) implements Instruction {
@Override
public void apply(Interpreter interpreter) {
interpreter.loadKeywords(url, ordinal, features, metadata, words);
}
@Override
public boolean isNoOp() {
return false;
}
@Override
public InstructionTag tag() {
return InstructionTag.WORDS;
}
@Override
public String toString() {
return getClass().getSimpleName()+"["+ words+"]";
}
}

View File

@ -1,37 +0,0 @@
package nu.marginalia.converting.instruction.instructions;
import nu.marginalia.model.crawl.UrlIndexingState;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.InstructionTag;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.model.EdgeUrl;
import org.jetbrains.annotations.Nullable;
public record LoadProcessedDocument(EdgeUrl url,
int ordinal, UrlIndexingState state,
String title,
String description,
int htmlFeatures,
String standard,
int length,
long hash,
double quality,
@Nullable Integer pubYear
) implements Instruction
{
@Override
public void apply(Interpreter interpreter) {
interpreter.loadProcessedDocument(this);
}
@Override
public InstructionTag tag() {
return InstructionTag.PROC_DOCUMENT;
}
@Override
public boolean isNoOp() {
return false;
}
}

View File

@ -1,29 +0,0 @@
package nu.marginalia.converting.instruction.instructions;
import nu.marginalia.model.crawl.UrlIndexingState;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.InstructionTag;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.model.EdgeUrl;
public record LoadProcessedDocumentWithError(EdgeUrl url,
UrlIndexingState state,
String reason,
int ordinal) implements Instruction
{
@Override
public void apply(Interpreter interpreter) {
interpreter.loadProcessedDocumentWithError(this);
}
@Override
public InstructionTag tag() {
return InstructionTag.PROC_DOCUMENT_ERR;
}
@Override
public boolean isNoOp() {
return false;
}
}

View File

@ -1,26 +0,0 @@
package nu.marginalia.converting.instruction.instructions;
import nu.marginalia.model.crawl.DomainIndexingState;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.InstructionTag;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.model.EdgeDomain;
public record LoadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip) implements Instruction {
@Override
public void apply(Interpreter interpreter) {
interpreter.loadProcessedDomain(domain, state, ip);
}
@Override
public InstructionTag tag() {
return InstructionTag.PROC_DOMAIN;
}
@Override
public boolean isNoOp() {
return false;
}
}

View File

@ -1,32 +0,0 @@
package nu.marginalia.converting.instruction.instructions;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.InstructionTag;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.model.EdgeUrl;
import java.util.Arrays;
public record LoadRssFeed(EdgeUrl... feeds) implements Instruction {
@Override
public void apply(Interpreter interpreter) {
interpreter.loadRssFeed(feeds);
}
@Override
public String toString() {
return getClass().getSimpleName()+"["+ Arrays.toString(feeds)+"]";
}
@Override
public InstructionTag tag() {
return InstructionTag.RSS;
}
@Override
public boolean isNoOp() {
return feeds.length == 0;
}
}

View File

@ -11,25 +11,15 @@ java {
}
}
dependencies {
//implementation project(':third-party:monkey-patch-gson')
implementation project(':code:common:db')
implementation project(':code:common:model')
implementation project(':code:api:index-api')
implementation project(':code:common:service-discovery')
implementation project(':code:common:service-client')
implementation project(':code:libraries:language-processing')
implementation project(':code:features-convert:keyword-extraction')
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.bundles.slf4j
implementation project(':third-party:parquet-floor')
implementation libs.notnull
implementation libs.trove
implementation libs.fastutil
implementation libs.bundles.parquet
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit

View File

@ -0,0 +1,18 @@
The processed-data package contains models and logic for
reading and writing parquet files with the output from the
[converting-process](../../processes/converting-process).
Main models:
* [DocumentRecord](src/main/java/nu/marginalia/model/processed/DocumentRecord.java)
* * [DocumentRecordKeywordsProjection](src/main/java/nu/marginalia/model/processed/DocumentRecordKeywordsProjection.java)
* * [DocumentRecordMetadataProjection](src/main/java/nu/marginalia/model/processed/DocumentRecordMetadataProjection.java)
* [DomainLinkRecord](src/main/java/nu/marginalia/model/processed/DomainLinkRecord.java)
* [DomainRecord](src/main/java/nu/marginalia/model/processed/DomainRecord.java)
Since parquet is a column based format, some of the readable models are projections
that only read parts of the input file.
## See Also
[third-party/parquet-floor](../../../third-party/parquet-floor)

View File

@ -0,0 +1,37 @@
package nu.marginalia.io.processed;
import blue.strategic.parquet.HydratorSupplier;
import blue.strategic.parquet.ParquetReader;
import nu.marginalia.model.processed.DocumentRecord;
import nu.marginalia.model.processed.DocumentRecordKeywordsProjection;
import nu.marginalia.model.processed.DocumentRecordMetadataProjection;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.nio.file.Path;
import java.util.stream.Stream;
public class DocumentRecordParquetFileReader {
@NotNull
public static Stream<DocumentRecord> stream(Path path) throws IOException {
return ParquetReader.streamContent(path.toFile(),
HydratorSupplier.constantly(DocumentRecord.newHydrator()));
}
@NotNull
public static Stream<DocumentRecordKeywordsProjection> streamKeywordsProjection(Path path) throws IOException {
return ParquetReader.streamContent(path.toFile(),
HydratorSupplier.constantly(DocumentRecordKeywordsProjection.newHydrator()),
DocumentRecordKeywordsProjection.requiredColumns()
);
}
@NotNull
public static Stream<DocumentRecordMetadataProjection> streamMetadataProjection(Path path) throws IOException {
return ParquetReader.streamContent(path.toFile(),
HydratorSupplier.constantly(DocumentRecordMetadataProjection.newHydrator()),
DocumentRecordMetadataProjection.requiredColumns()
);
}
}

View File

@ -0,0 +1,24 @@
package nu.marginalia.io.processed;
import blue.strategic.parquet.ParquetWriter;
import nu.marginalia.model.processed.DocumentRecord;
import java.io.IOException;
import java.nio.file.Path;
public class DocumentRecordParquetFileWriter implements AutoCloseable {
private final ParquetWriter<DocumentRecord> writer;
public DocumentRecordParquetFileWriter(Path file) throws IOException {
writer = ParquetWriter.writeFile(DocumentRecord.schema,
file.toFile(), DocumentRecord.newDehydrator());
}
public void write(DocumentRecord domainData) throws IOException {
writer.write(domainData);
}
public void close() throws IOException {
writer.close();
}
}

View File

@ -0,0 +1,31 @@
package nu.marginalia.io.processed;
import blue.strategic.parquet.HydratorSupplier;
import blue.strategic.parquet.ParquetReader;
import nu.marginalia.model.processed.DomainLinkRecord;
import nu.marginalia.model.processed.DomainRecord;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class DomainLinkRecordParquetFileReader {
@NotNull
public static Stream<DomainLinkRecord> stream(Path path) throws IOException {
return ParquetReader.streamContent(path.toFile(),
HydratorSupplier.constantly(DomainLinkRecord.newHydrator()));
}
@NotNull
public static Set<String> getDestDomainNames(Path path) throws IOException {
return ParquetReader.streamContent(path.toFile(),
HydratorSupplier.constantly(DomainLinkRecord.newDestDomainHydrator()),
List.of("dest"))
.collect(Collectors.toSet());
}
}

View File

@ -0,0 +1,24 @@
package nu.marginalia.io.processed;
import blue.strategic.parquet.ParquetWriter;
import nu.marginalia.model.processed.DomainLinkRecord;
import java.io.IOException;
import java.nio.file.Path;
public class DomainLinkRecordParquetFileWriter implements AutoCloseable {
private final ParquetWriter<DomainLinkRecord> writer;
public DomainLinkRecordParquetFileWriter(Path file) throws IOException {
writer = ParquetWriter.writeFile(DomainLinkRecord.schema,
file.toFile(), DomainLinkRecord.newDehydrator());
}
public void write(DomainLinkRecord domainData) throws IOException {
writer.write(domainData);
}
public void close() throws IOException {
writer.close();
}
}

View File

@ -0,0 +1,30 @@
package nu.marginalia.io.processed;
import blue.strategic.parquet.HydratorSupplier;
import blue.strategic.parquet.ParquetReader;
import nu.marginalia.model.processed.DomainRecord;
import org.jetbrains.annotations.NotNull;
import java.io.IOException;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.Stream;
public class DomainRecordParquetFileReader {
@NotNull
public static Stream<DomainRecord> stream(Path path) throws IOException {
return ParquetReader.streamContent(path.toFile(),
HydratorSupplier.constantly(DomainRecord.newHydrator()));
}
@NotNull
public static List<String> getDomainNames(Path path) throws IOException {
return ParquetReader.streamContent(path.toFile(),
HydratorSupplier.constantly(DomainRecord.newDomainNameHydrator()),
List.of("domain"))
.toList();
}
}

View File

@ -0,0 +1,24 @@
package nu.marginalia.io.processed;
import blue.strategic.parquet.ParquetWriter;
import nu.marginalia.model.processed.DomainRecord;
import java.io.IOException;
import java.nio.file.Path;
public class DomainRecordParquetFileWriter implements AutoCloseable {
private final ParquetWriter<DomainRecord> writer;
public DomainRecordParquetFileWriter(Path file) throws IOException {
writer = ParquetWriter.writeFile(DomainRecord.schema,
file.toFile(), DomainRecord.newDehydrator());
}
public void write(DomainRecord domainData) throws IOException {
writer.write(domainData);
}
public void close() throws IOException {
writer.close();
}
}

View File

@ -0,0 +1,73 @@
package nu.marginalia.io.processed;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
public class ProcessedDataFileNames {
public static Path documentFileName(Path base, int batchNumber) {
return base.resolve(String.format("document%04d.parquet", batchNumber));
}
public static Path domainFileName(Path base, int batchNumber) {
return base.resolve(String.format("domain%04d.parquet", batchNumber));
}
public static Path domainLinkFileName(Path base, int batchNumber) {
return base.resolve(String.format("domain-link%04d.parquet", batchNumber));
}
public static List<Path> listDocumentFiles(Path base, int untilBatch) {
List<Path> ret = new ArrayList<>(untilBatch);
for (int i = 0; i < untilBatch; i++) {
Path maybe = documentFileName(base, i);
if (Files.exists(maybe)) {
ret.add(maybe);
}
}
return ret;
}
public static List<Path> listDomainFiles(Path base, int untilBatch) {
List<Path> ret = new ArrayList<>(untilBatch);
for (int i = 0; i < untilBatch; i++) {
Path maybe = domainFileName(base, i);
if (Files.exists(maybe)) {
ret.add(maybe);
}
}
return ret;
}
public static List<Path> listDomainFiles(Path base) {
List<Path> ret = new ArrayList<>();
for (int i = 0;; i++) {
Path maybe = domainFileName(base, i);
if (Files.exists(maybe)) {
ret.add(maybe);
}
else {
break;
}
}
return ret;
}
public static List<Path> listDomainLinkFiles(Path base, int untilBatch) {
List<Path> ret = new ArrayList<>(untilBatch);
for (int i = 0; i < untilBatch; i++) {
Path maybe = domainLinkFileName(base, i);
if (Files.exists(maybe)) {
ret.add(maybe);
}
}
return ret;
}
}

View File

@ -0,0 +1,171 @@
package nu.marginalia.model.processed;
import blue.strategic.parquet.Dehydrator;
import blue.strategic.parquet.Hydrator;
import blue.strategic.parquet.ValueWriter;
import gnu.trove.list.TLongList;
import gnu.trove.list.array.TLongArrayList;
import lombok.*;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
import java.util.List;
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class DocumentRecord {
@NotNull
public String domain;
@NotNull
public String url;
public int ordinal;
@NotNull
public String state;
@Nullable
public String stateReason;
@Nullable
public String title;
@Nullable
public String description;
public int htmlFeatures;
@Nullable
public String htmlStandard;
public int length;
public long hash;
public float quality;
public long documentMetadata;
@Nullable
public Integer pubYear;
@Nullable
public List<String> words;
@Nullable
public TLongList metas;
public static Hydrator<DocumentRecord, DocumentRecord> newHydrator() {
return new DocumentDataHydrator();
}
public static Dehydrator<DocumentRecord> newDehydrator() {
return DocumentRecord::dehydrate;
}
public static MessageType schema = new MessageType(
DocumentRecord.class.getSimpleName(),
Types.required(BINARY).as(stringType()).named("domain"),
Types.required(BINARY).as(stringType()).named("url"),
Types.required(INT32).named("ordinal"),
Types.required(BINARY).as(stringType()).named("state"),
Types.optional(BINARY).as(stringType()).named("stateReason"),
Types.optional(BINARY).as(stringType()).named("title"),
Types.optional(BINARY).as(stringType()).named("description"),
Types.optional(INT32).named("htmlFeatures"),
Types.optional(BINARY).as(stringType()).named("htmlStandard"),
Types.optional(INT64).named("hash"),
Types.optional(INT64).named("documentMetadata"),
Types.optional(INT32).named("length"),
Types.optional(FLOAT).named("quality"),
Types.optional(INT32).named("pubYear"),
Types.repeated(INT64).named("wordMeta"),
Types.repeated(BINARY).as(stringType()).named("word")
);
public DocumentRecord add(String heading, Object value) {
switch (heading) {
case "domain" -> domain = (String) value;
case "url" -> url = (String) value;
case "ordinal" -> ordinal = (Integer) value;
case "htmlFeatures" -> htmlFeatures = (Integer) value;
case "length" -> length = (Integer) value;
case "pubYear" -> pubYear = (Integer) value;
case "hash" -> hash = (Long) value;
case "documentMetadata" -> documentMetadata = (Long) value;
case "quality" -> quality = (Float) value;
case "state" -> state = (String) value;
case "stateReason" -> stateReason = (String) value;
case "title" -> title = (String) value;
case "description" -> description = (String) value;
case "htmlStandard" -> htmlStandard = (String) value;
case "word" -> {
if (this.words == null)
this.words = new ArrayList<>(100);
this.words.add((String) value);
}
case "wordMeta" -> {
if (this.metas == null) {
this.metas = new TLongArrayList(100);
}
this.metas.add((long) value);
}
default -> throw new UnsupportedOperationException("Unknown heading '" + heading + '"');
}
return this;
}
public void dehydrate(ValueWriter valueWriter) {
valueWriter.write("domain", domain);
valueWriter.write("url", url);
valueWriter.write("ordinal", ordinal);
valueWriter.write("state", state);
if (stateReason != null)
valueWriter.write("stateReason", stateReason);
if (title != null)
valueWriter.write("title", title);
if (description != null)
valueWriter.write("description", description);
valueWriter.write("htmlFeatures", htmlFeatures);
valueWriter.write("htmlStandard", htmlStandard);
valueWriter.write("documentMetadata", documentMetadata);
valueWriter.write("length", length);
valueWriter.write("hash", hash);
valueWriter.write("quality", quality);
if (pubYear != null) {
valueWriter.write("pubYear", pubYear);
}
if (metas != null) {
valueWriter.writeList("wordMeta", metas);
}
if (words != null) {
valueWriter.writeList("word", words);
}
}
}
class DocumentDataHydrator implements Hydrator<DocumentRecord, DocumentRecord> {
@Override
public DocumentRecord start() {
return new DocumentRecord();
}
@Override
public DocumentRecord add(DocumentRecord target, String heading, Object value) {
return target.add(heading, value);
}
@Override
public DocumentRecord finish(DocumentRecord target) {
return target;
}
}

View File

@ -0,0 +1,92 @@
package nu.marginalia.model.processed;
import blue.strategic.parquet.Dehydrator;
import blue.strategic.parquet.Hydrator;
import blue.strategic.parquet.ValueWriter;
import gnu.trove.list.TLongList;
import gnu.trove.list.array.TLongArrayList;
import lombok.*;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class DocumentRecordKeywordsProjection {
@NotNull
public String domain;
public int ordinal;
public int htmlFeatures;
public long documentMetadata;
public List<String> words;
public TLongList metas;
public boolean hasKeywords() {
return words != null && metas != null;
}
public static Hydrator<DocumentRecordKeywordsProjection, DocumentRecordKeywordsProjection> newHydrator() {
return new DocumentRecordKeywordsProjectionHydrator();
}
public static Collection<String> requiredColumns() {
return List.of("domain", "ordinal", "htmlFeatures", "word", "wordMeta", "documentMetadata");
}
public DocumentRecordKeywordsProjection add(String heading, Object value) {
switch (heading) {
case "domain" -> domain = (String) value;
case "ordinal" -> ordinal = (Integer) value;
case "htmlFeatures" -> htmlFeatures = (Integer) value;
case "documentMetadata" -> documentMetadata = (Long) value;
case "word" -> {
if (this.words == null)
this.words = new ArrayList<>(100);
this.words.add((String) value);
}
case "wordMeta" -> {
if (this.metas == null) {
this.metas = new TLongArrayList(100);
}
this.metas.add((long) value);
}
default -> throw new UnsupportedOperationException("Unknown heading '" + heading + '"');
}
return this;
}
}
class DocumentRecordKeywordsProjectionHydrator implements Hydrator<DocumentRecordKeywordsProjection, DocumentRecordKeywordsProjection> {
@Override
public DocumentRecordKeywordsProjection start() {
return new DocumentRecordKeywordsProjection();
}
@Override
public DocumentRecordKeywordsProjection add(DocumentRecordKeywordsProjection target, String heading, Object value) {
return target.add(heading, value);
}
@Override
public DocumentRecordKeywordsProjection finish(DocumentRecordKeywordsProjection target) {
return target;
}
}

View File

@ -0,0 +1,100 @@
package nu.marginalia.model.processed;
import blue.strategic.parquet.Hydrator;
import lombok.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.util.Collection;
import java.util.List;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class DocumentRecordMetadataProjection {
@NotNull
public String domain;
@NotNull
public String url;
public int ordinal;
@NotNull
public String state;
@Nullable
public String stateReason;
@Nullable
public String title;
@Nullable
public String description;
public int htmlFeatures;
@Nullable
public String htmlStandard;
public int length;
public long hash;
public float quality;
public long documentMetadata;
@Nullable
public Integer pubYear;
public static Collection<String> requiredColumns() {
return List.of("domain", "url", "ordinal", "htmlFeatures", "length", "pubYear",
"hash", "documentMetadata", "quality", "state", "stateReason",
"title", "description", "htmlStandard");
}
public DocumentRecordMetadataProjection add(String heading, Object value) {
switch (heading) {
case "domain" -> domain = (String) value;
case "url" -> url = (String) value;
case "ordinal" -> ordinal = (Integer) value;
case "htmlFeatures" -> htmlFeatures = (Integer) value;
case "length" -> length = (Integer) value;
case "pubYear" -> pubYear = (Integer) value;
case "hash" -> hash = (Long) value;
case "documentMetadata" -> documentMetadata = (Long) value;
case "quality" -> quality = (Float) value;
case "state" -> state = (String) value;
case "stateReason" -> stateReason = (String) value;
case "title" -> title = (String) value;
case "description" -> description = (String) value;
case "htmlStandard" -> htmlStandard = (String) value;
default -> throw new UnsupportedOperationException("Unknown heading '" + heading + '"');
}
return this;
}
public static Hydrator<DocumentRecordMetadataProjection, DocumentRecordMetadataProjection> newHydrator() {
return new DocumentRecordMetadataHydrator();
}
}
class DocumentRecordMetadataHydrator implements Hydrator<DocumentRecordMetadataProjection, DocumentRecordMetadataProjection> {
@Override
public DocumentRecordMetadataProjection start() {
return new DocumentRecordMetadataProjection();
}
@Override
public DocumentRecordMetadataProjection add(DocumentRecordMetadataProjection target, String heading, Object value) {
return target.add(heading, value);
}
@Override
public DocumentRecordMetadataProjection finish(DocumentRecordMetadataProjection target) {
return target;
}
}

View File

@ -0,0 +1,97 @@
package nu.marginalia.model.processed;
import blue.strategic.parquet.Dehydrator;
import blue.strategic.parquet.Hydrator;
import blue.strategic.parquet.ValueWriter;
import lombok.*;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Types;
import org.jetbrains.annotations.NotNull;
import static org.apache.parquet.schema.LogicalTypeAnnotation.stringType;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class DomainLinkRecord {
@NotNull
public String source;
@NotNull
public String dest;
public void dehydrate(ValueWriter valueWriter) {
valueWriter.write("source", source);
valueWriter.write("dest", dest);
}
public static Dehydrator<DomainLinkRecord> newDehydrator() {
return DomainLinkRecord::dehydrate;
}
public static Hydrator<DomainLinkRecord, DomainLinkRecord> newHydrator() {
return new DomainLinkDataHydrator();
}
public static Hydrator<String, String> newDestDomainHydrator() {
return new DestDomainNameHydrator();
}
public static MessageType schema = new MessageType(
DomainLinkRecord.class.getSimpleName(),
Types.required(BINARY).as(stringType()).named("source"),
Types.required(BINARY).as(stringType()).named("dest")
);
public DomainLinkRecord add(String heading, Object value) {
switch (heading) {
case "source" -> source = (String) value;
case "dest" -> dest = (String) value;
default -> throw new UnsupportedOperationException("Unknown heading '" + heading + '"');
}
return this;
}
}
class DomainLinkDataHydrator implements Hydrator<DomainLinkRecord, DomainLinkRecord> {
@Override
public DomainLinkRecord start() {
return new DomainLinkRecord();
}
@Override
public DomainLinkRecord add(DomainLinkRecord target, String heading, Object value) {
return target.add(heading, value);
}
@Override
public DomainLinkRecord finish(DomainLinkRecord target) {
return target;
}
}
class DestDomainNameHydrator implements Hydrator<String, String> {
@Override
public String start() {
return "";
}
@Override
public String add(String target, String heading, Object value) {
if ("dest".equals(heading)) {
return (String) value;
}
return target;
}
@Override
public String finish(String target) {
return target;
}
}

View File

@ -0,0 +1,146 @@
package nu.marginalia.model.processed;
import blue.strategic.parquet.Dehydrator;
import blue.strategic.parquet.Hydrator;
import blue.strategic.parquet.ValueWriter;
import lombok.*;
import org.apache.parquet.schema.*;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import java.sql.Array;
import java.util.ArrayList;
import java.util.List;
import static org.apache.parquet.schema.LogicalTypeAnnotation.*;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
@ToString
public class DomainRecord {
@NotNull
public String domain;
public int knownUrls;
public int goodUrls;
public int visitedUrls;
@Nullable
public String state;
@Nullable
public String redirectDomain;
@Nullable
public String ip;
public List<String> rssFeeds;
public static Hydrator<DomainRecord, DomainRecord> newHydrator() {
return new DomainHydrator();
}
public static Dehydrator<DomainRecord> newDehydrator() {
return DomainRecord::dehydrate;
}
public static Hydrator<String, String> newDomainNameHydrator() {
return new DomainNameHydrator();
}
public static MessageType schema = new MessageType(
DomainRecord.class.getSimpleName(),
Types.required(BINARY).as(stringType()).named("domain"),
Types.optional(INT32).named("knownUrls"),
Types.optional(INT32).named("visitedUrls"),
Types.optional(INT32).named("goodUrls"),
Types.required(BINARY).as(stringType()).named("state"),
Types.optional(BINARY).as(stringType()).named("redirectDomain"),
Types.optional(BINARY).as(stringType()).named("ip"),
Types.repeated(BINARY).as(stringType()).named("rss")
);
DomainRecord add(String heading, Object value) {
switch (heading) {
case "domain" -> domain = (String) value;
case "knownUrls" -> knownUrls = (Integer) value;
case "visitedUrls" -> visitedUrls = (Integer) value;
case "goodUrls" -> goodUrls = (Integer) value;
case "state" -> state = (String) value;
case "redirectDomain" -> redirectDomain = (String) value;
case "ip" -> ip = (String) value;
case "rss" -> {
if (rssFeeds == null) {
rssFeeds = new ArrayList<>();
}
rssFeeds.add((String) value);
}
default -> throw new UnsupportedOperationException("Unknown heading '" + heading + '"');
}
return this;
}
private void dehydrate(ValueWriter valueWriter) {
valueWriter.write("domain", domain);
valueWriter.write("knownUrls", knownUrls);
valueWriter.write("goodUrls", goodUrls);
valueWriter.write("visitedUrls", visitedUrls);
if (state != null) {
valueWriter.write("state", state);
}
if (redirectDomain != null) {
valueWriter.write("redirectDomain", redirectDomain);
}
if (ip != null) {
valueWriter.write("ip", ip);
}
if (rssFeeds != null) {
valueWriter.writeList("rss", rssFeeds);
}
}
}
class DomainHydrator implements Hydrator<DomainRecord, DomainRecord> {
@Override
public DomainRecord start() {
return new DomainRecord();
}
@Override
public DomainRecord add(DomainRecord target, String heading, Object value) {
return target.add(heading, value);
}
@Override
public DomainRecord finish(DomainRecord target) {
return target;
}
}
class DomainNameHydrator implements Hydrator<String, String> {
@Override
public String start() {
return "";
}
@Override
public String add(String target, String heading, Object value) {
if ("domain".equals(heading)) {
return (String) value;
}
return target;
}
@Override
public String finish(String target) {
return target;
}
}

View File

@ -0,0 +1,92 @@
package nu.marginalia.io.processed;
import gnu.trove.list.array.TLongArrayList;
import nu.marginalia.model.processed.DocumentRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import static org.junit.jupiter.api.Assertions.*;
class DocumentRecordParquetFileReaderTest {
Path parquetFile;
@BeforeEach
public void setUp() throws IOException {
parquetFile = Files.createTempFile(getClass().getSimpleName(), ".parquet");
}
@AfterEach
public void tearDown() throws IOException {
Files.deleteIfExists(parquetFile);
}
@Test
public void test() throws IOException {
var doc = new DocumentRecord(
"www.marginalia.nu",
"https://www.marginalia.nu/",
0,
"OK",
null,
"Itsa me, Marginalia!",
"Hello World",
3,
"HTML5",
123,
0xF00BA3L,
0.25f,
4L,
null,
List.of("Hello", "world"),
new TLongArrayList(new long[] { 2, 3})
);
try (var writer = new DocumentRecordParquetFileWriter(parquetFile)) {
writer.write(doc);
}
var read = DocumentRecordParquetFileReader.stream(parquetFile).toList();
assertEquals(List.of(doc), read);
}
@Test
public void testHugePayload() throws IOException {
List<String> words = IntStream.range(0, 100000).mapToObj(Integer::toString).toList();
TLongArrayList metas = new TLongArrayList(LongStream.range(0, 100000).toArray());
var doc = new DocumentRecord(
"www.marginalia.nu",
"https://www.marginalia.nu/",
0,
"OK",
null,
"Itsa me, Marginalia!",
"Hello World",
3,
"HTML5",
123,
0xF00BA3L,
0.25f,
5L,
null,
words,
metas
);
try (var writer = new DocumentRecordParquetFileWriter(parquetFile)) {
writer.write(doc);
}
var read = DocumentRecordParquetFileReader.stream(parquetFile).toList();
assertEquals(List.of(doc), read);
}
}

View File

@ -0,0 +1,49 @@
package nu.marginalia.io.processed;
import nu.marginalia.model.processed.DomainLinkRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
class DomainLinkRecordParquetFileReaderTest {
Path parquetFile;
@BeforeEach
public void setUp() throws IOException {
parquetFile = Files.createTempFile(getClass().getSimpleName(), ".parquet");
}
@AfterEach
public void tearDown() throws IOException {
Files.deleteIfExists(parquetFile);
}
@Test
public void testReadFull() throws IOException {
var first = new DomainLinkRecord(
"www.marginalia.nu",
"memex.marginalia.nu");
var second = new DomainLinkRecord(
"memex.marginalia.nu",
"search.marginalia.nu"
);
try (var writer = new DomainLinkRecordParquetFileWriter(parquetFile)) {
writer.write(first);
writer.write(second);
}
var items = DomainLinkRecordParquetFileReader
.stream(parquetFile)
.toList();
assertEquals(List.of(first, second), items);
}
}

View File

@ -0,0 +1,65 @@
package nu.marginalia.io.processed;
import nu.marginalia.model.processed.DomainRecord;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.List;
import static org.junit.jupiter.api.Assertions.*;
class DomainRecordParquetFileReaderTest {
Path parquetFile;
@BeforeEach
public void setUp() throws IOException {
parquetFile = Files.createTempFile(getClass().getSimpleName(), ".parquet");
}
@AfterEach
public void tearDown() throws IOException {
Files.deleteIfExists(parquetFile);
}
@Test
public void testReadFull() throws IOException {
var first = new DomainRecord(
"www.marginalia.nu",
10,
3,
5,
"'sall good man",
null,
"127.0.0.1",
List.of("a", "b")
);
var second = new DomainRecord(
"memex.marginalia.nu",
0,
0,
0,
"REDIRECT",
"www.marginalia.nu",
"127.0.0.1",
null
);
try (var writer = new DomainRecordParquetFileWriter(parquetFile)) {
writer.write(first);
writer.write(second);
}
var domainNames = DomainRecordParquetFileReader.getDomainNames(parquetFile);
assertEquals(List.of("www.marginalia.nu", "memex.marginalia.nu"), domainNames);
var items = DomainRecordParquetFileReader
.stream(parquetFile)
.toList();
assertEquals(List.of(first, second), items);
}
}

View File

@ -0,0 +1,33 @@
plugins {
id 'java'
id "io.freefair.lombok" version "8.2.2"
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(20))
}
}
dependencies {
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.bundles.slf4j
implementation libs.notnull
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
test {
useJUnitPlatform()
}
task fastTests(type: Test) {
useJUnitPlatform {
excludeTags "slow"
}
}

View File

@ -0,0 +1,44 @@
package nu.marginalia.worklog;
import java.io.IOException;
/** The BatchingWorkLog is a work log for items of work performed in batches,
* where each batch needs to be finalized before the items it consists of can be
* considered done. This is needed when the data is serialized into a format such
* as Parquet, where disparate items go into the same file, and the writer needs to be
* properly closed before the file can be read.
*/
public interface BatchingWorkLog extends AutoCloseable {
/** Returns true if logItem(id) has been run,
* and logFinishedBatch has been run after that.
*/
boolean isItemCommitted(String id);
/** Returns true if logItem(id) has been run
* but not logFinishedBatch().
* <p/>
* Unlike isItemCommitted(), this state is ephemeral and not
* retained if e.g. the process crashes and resumes.
* */
boolean isItemInCurrentBatch(String id);
default boolean isItemProcessed(String id) {
return isItemCommitted(id) || isItemInCurrentBatch(id);
}
/** Log additional item to the current batch */
void logItem(String id) throws IOException;
/** Mark the current batch as finished and increment
* the batch number counter
*/
void logFinishedBatch() throws IOException;
int getBatchNumber();
/** Returns false if logItem has been invoked since last logFinishedBatch */
boolean isCurrentBatchEmpty();
int size();
}

View File

@ -0,0 +1,242 @@
package nu.marginalia.worklog;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.LocalDateTime;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
public class BatchingWorkLogImpl implements BatchingWorkLog {
private int batchNumber = 0;
private final Set<String> currentBatchItems = ConcurrentHashMap.newKeySet(10_000);
private final Set<String> committedItems = ConcurrentHashMap.newKeySet(10_000);
private final OutputStream writer;
/** Create or open a work log for appending new entries.
* <p></p>
* Opening a work log this way will cause it to be modified
* with a comment annotating when it was opened, and possibly
* a crash marker to indicate that data is to be discarded.
* <p></p>
* Use BatchingWorkLogInspector for read-only access!
*/
public BatchingWorkLogImpl(Path file) throws IOException {
if (Files.exists(file)) {
try (var linesStream = Files.lines(file)) {
linesStream.map(WorkLogItem::parse).forEach(
item -> item.replay(this)
);
}
writer = Files.newOutputStream(file, StandardOpenOption.APPEND);
// This is helpful for debugging, and will also ensure any partially written line
// gets a newline at the end
writeLogEntry(new CommentLine("Log resumed on " + LocalDateTime.now()));
if (getCurrentBatchSize() > 0) {
writeLogEntry(new CrashMarker());
}
}
else {
writer = Files.newOutputStream(file, StandardOpenOption.CREATE_NEW);
writeLogEntry(new CommentLine("Log created on " + LocalDateTime.now()));
writeLogEntry(new CommentLine(" Format: "));
writeLogEntry(new CommentLine(" " + AddItem.MARKER + " ID\tsignifies adding an item to the current batch"));
writeLogEntry(new CommentLine(" " + FinishBatch.MARKER + "\tsignifies finalizing the current batch and switching to the next"));
writeLogEntry(new CommentLine(" " + CrashMarker.MARKER + "\tdiscard contents from the current batch and start over, written after a crash"));
writeLogEntry(new CommentLine("Upon a crash, items that have re-process until their batch is finalized"));
}
}
void writeLogEntry(WorkLogItem item) throws IOException {
item.write(this);
}
void writeLine(String line) throws IOException {
writer.write(line.getBytes(StandardCharsets.UTF_8));
writer.write('\n');
writer.flush();
}
@Override
public boolean isItemCommitted(String id) {
return committedItems.contains(id);
}
@Override
public boolean isItemInCurrentBatch(String id) {
return currentBatchItems.contains(id);
}
@Override
public void logItem(String id) throws IOException {
writeLogEntry(new AddItem(id));
addItemToCurrentBatch(id);
}
@Override
public void logFinishedBatch() throws IOException {
writeLogEntry(new FinishBatch());
incrementBatch();
}
void incrementBatch() {
batchNumber++;
// Transfer all items from the current batch to the committed items' batch
committedItems.addAll(currentBatchItems);
currentBatchItems.clear();
}
void restartBatch() {
currentBatchItems.clear();
}
void addItemToCurrentBatch(String id) {
currentBatchItems.add(id);
}
@Override
public void close() throws IOException {
writer.flush();
writer.close();
}
@Override
public int getBatchNumber() {
return batchNumber;
}
@Override
public boolean isCurrentBatchEmpty() {
return currentBatchItems.isEmpty();
}
public int getCurrentBatchSize() {
return currentBatchItems.size();
}
public int size() {
return currentBatchItems.size() + committedItems.size();
}
}
interface WorkLogItem {
void replay(BatchingWorkLogImpl bwl);
void write(BatchingWorkLogImpl bwl) throws IOException;
static WorkLogItem parse(String line) {
if (line.isBlank())
return new BlankLine();
var lineParts = LogLineParts.parse(line);
return switch (lineParts.tag()) {
case CommentLine.MARKER -> new CommentLine(lineParts.arg());
case AddItem.MARKER -> new AddItem(lineParts.arg());
case FinishBatch.MARKER -> new FinishBatch();
case CrashMarker.MARKER -> new CrashMarker();
default -> throw new WorkLogParseException(line);
};
}
}
record LogLineParts(char tag, String arg) {
public static LogLineParts parse(String line) {
line = line.trim();
char tag = line.charAt(0);
String arg = line.substring(1).trim();
int commentIdx = arg.indexOf('#');
if (commentIdx >= 0) arg = arg.substring(0, commentIdx).trim();
return new LogLineParts(tag, arg);
}
}
record CommentLine(String comment) implements WorkLogItem {
final static char MARKER = '#';
@Override
public void replay(BatchingWorkLogImpl bwl) {}
@Override
public void write(BatchingWorkLogImpl bwl) throws IOException {
bwl.writeLine(MARKER + " " + comment);
}
}
record BlankLine() implements WorkLogItem {
final static char MARKER = ' ';
@Override
public void replay(BatchingWorkLogImpl bwl) {}
@Override
public void write(BatchingWorkLogImpl bwl) throws IOException {
bwl.writeLine(MARKER + "");
}
}
record FinishBatch() implements WorkLogItem {
final static char MARKER = 'F';
@Override
public void replay(BatchingWorkLogImpl bwl) {
bwl.incrementBatch();
}
@Override
public void write(BatchingWorkLogImpl bwl) throws IOException {
bwl.writeLine("# " + LocalDateTime.now());
bwl.writeLine("# finalizing batchNumber = " + bwl.getBatchNumber());
bwl.writeLine(Character.toString(MARKER));
}
}
record CrashMarker() implements WorkLogItem {
final static char MARKER = 'X';
@Override
public void replay(BatchingWorkLogImpl bwl) {
bwl.restartBatch();
}
@Override
public void write(BatchingWorkLogImpl bwl) throws IOException {
bwl.writeLine("# " + LocalDateTime.now());
bwl.writeLine("# discarding batchNumber = " + bwl.getBatchNumber());
bwl.writeLine(Character.toString(MARKER));
}
}
record AddItem(String id) implements WorkLogItem {
final static char MARKER = '+';
@Override
public void replay(BatchingWorkLogImpl bwl) {
bwl.addItemToCurrentBatch(id);
}
@Override
public void write(BatchingWorkLogImpl bwl) throws IOException {
bwl.writeLine(MARKER + " " + id);
}
}
class WorkLogParseException extends RuntimeException {
@Serial
private static final long serialVersionUID = -1238138989389021166L;
public WorkLogParseException(String logLine) {
super("Failed to parse work log line: '" + logLine + "'");
}
}

View File

@ -0,0 +1,22 @@
package nu.marginalia.worklog;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
public class BatchingWorkLogInspector {
/** Batches up until the return value of this method
* are considered valid. If the method returns 2, then batches
* 0 and 1 are good, etc.
* <p></p>
* Invariant: BatchingWorkLogInspector.getValidBatches() always
* returns the same value as BatchingWorkLog.getBatchNumber()
*/
public static int getValidBatches(Path file) throws IOException {
try (var linesStream = Files.lines(file)) {
return (int) linesStream.map(WorkLogItem::parse)
.filter(FinishBatch.class::isInstance)
.count();
}
}
}

View File

@ -0,0 +1,74 @@
package nu.marginalia.worklog;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import static org.junit.jupiter.api.Assertions.*;
class BatchingWorkLogImplTest {
Path fileName;
@BeforeEach
public void setUp() throws IOException {
fileName = Files.createTempFile(getClass().getSimpleName(), ".test");
}
@AfterEach
public void tearDown() throws IOException {
Files.deleteIfExists(fileName);
}
@Test
public void testResumeOnEmptyFile() throws IOException {
Files.delete(fileName);
try (var wl = new BatchingWorkLogImpl(fileName)) {
wl.logItem("1");
wl.logItem("2");
wl.logItem("3");
assertEquals(wl.getBatchNumber(), BatchingWorkLogInspector.getValidBatches(fileName));
wl.logFinishedBatch();
assertEquals(wl.getBatchNumber(), BatchingWorkLogInspector.getValidBatches(fileName));
wl.logItem("4");
wl.logItem("5");
assertEquals(wl.getBatchNumber(), BatchingWorkLogInspector.getValidBatches(fileName));
wl.logFinishedBatch();
wl.logItem("6");
assertEquals(wl.getBatchNumber(), BatchingWorkLogInspector.getValidBatches(fileName));
}
try (var wl = new BatchingWorkLogImpl(fileName)) {
assertTrue(wl.isItemCommitted("1"));
assertTrue(wl.isItemCommitted("2"));
assertTrue(wl.isItemCommitted("3"));
assertTrue(wl.isItemCommitted("4"));
assertTrue(wl.isItemCommitted("5"));
assertFalse(wl.isItemCommitted("6"));
wl.logItem("7");
wl.logFinishedBatch();
}
try (var wl = new BatchingWorkLogImpl(fileName)) {
assertTrue(wl.isItemCommitted("1"));
assertTrue(wl.isItemCommitted("2"));
assertTrue(wl.isItemCommitted("3"));
assertTrue(wl.isItemCommitted("4"));
assertTrue(wl.isItemCommitted("5"));
assertFalse(wl.isItemCommitted("6"));
assertTrue(wl.isItemCommitted("7"));
assertEquals(wl.getBatchNumber(), BatchingWorkLogInspector.getValidBatches(fileName));
}
Files.readAllLines(fileName).forEach(System.out::println);
}
}

View File

@ -43,7 +43,8 @@ dependencies {
implementation project(':code:libraries:big-string')
implementation project(':code:libraries:language-processing')
implementation project(':code:process-models:converting-model')
implementation project(':code:process-models:processed-data')
implementation project(':code:process-models:work-log')
implementation project(':code:process-models:crawling-model')
implementation project(':code:features-convert:adblock')

View File

@ -1,37 +0,0 @@
package nu.marginalia.converting;
import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdOutputStream;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
public class ConversionLog implements AutoCloseable, Interpreter {
private final PrintWriter writer;
public ConversionLog(Path rootDir) throws IOException {
String fileName = String.format("conversion-log-%s.zstd", LocalDateTime.now().toEpochSecond(ZoneOffset.UTC));
Path logFile = rootDir.resolve(fileName);
writer = new PrintWriter(new ZstdOutputStream(new BufferedOutputStream(Files.newOutputStream(logFile, StandardOpenOption.WRITE, StandardOpenOption.CREATE)), RecyclingBufferPool.INSTANCE));
}
@Override
public void close() throws IOException {
writer.close();
}
@Override
public synchronized void loadProcessedDocumentWithError(LoadProcessedDocumentWithError loadProcessedDocumentWithError) {
writer.printf("%s\t%s\n", loadProcessedDocumentWithError.url(), loadProcessedDocumentWithError.reason());
}
}

View File

@ -7,6 +7,8 @@ import com.google.inject.Injector;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.sideload.SideloadSource;
import nu.marginalia.converting.sideload.SideloadSourceFactory;
import nu.marginalia.converting.writer.ConverterBatchWriter;
import nu.marginalia.converting.writer.ConverterWriter;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
@ -17,8 +19,9 @@ import nu.marginalia.process.control.ProcessHeartbeat;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.service.module.DatabaseModule;
import nu.marginalia.worklog.BatchingWorkLog;
import nu.marginalia.worklog.BatchingWorkLogImpl;
import plan.CrawlPlan;
import nu.marginalia.converting.compiler.InstructionsCompiler;
import nu.marginalia.converting.processor.DomainProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -36,7 +39,6 @@ import static nu.marginalia.mqapi.ProcessInboxNames.CONVERTER_INBOX;
public class ConverterMain {
private static final Logger logger = LoggerFactory.getLogger(ConverterMain.class);
private final DomainProcessor processor;
private final InstructionsCompiler compiler;
private final Gson gson;
private final ProcessHeartbeat heartbeat;
private final MessageQueueFactory messageQueueFactory;
@ -65,7 +67,6 @@ public class ConverterMain {
@Inject
public ConverterMain(
DomainProcessor processor,
InstructionsCompiler compiler,
Gson gson,
ProcessHeartbeatImpl heartbeat,
MessageQueueFactory messageQueueFactory,
@ -74,7 +75,6 @@ public class ConverterMain {
)
{
this.processor = processor;
this.compiler = compiler;
this.gson = gson;
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
@ -85,22 +85,13 @@ public class ConverterMain {
}
public void convert(SideloadSource sideloadSource, Path writeDir) throws Exception {
int maxPoolSize = 16;
try (var writer = new ConverterBatchWriter(writeDir, 0);
BatchingWorkLog batchingWorkLog = new BatchingWorkLogImpl(writeDir.resolve("processor.log"))
) {
writer.write(sideloadSource);
try (WorkLog workLog = new WorkLog(writeDir.resolve("processor.log"));
ConversionLog conversionLog = new ConversionLog(writeDir)) {
var instructionWriter = new InstructionWriterFactory(conversionLog, writeDir, gson);
final String where;
final int size;
try (var writer = instructionWriter.createInstructionsForDomainWriter(sideloadSource.getId())) {
compiler.compileStreaming(sideloadSource, writer::accept);
where = writer.getFileName();
size = writer.getSize();
}
workLog.setJobToFinished(sideloadSource.getId(), where, size);
// We write an empty log with just a finish marker for the sideloading action
batchingWorkLog.logFinishedBatch();
}
}
@ -108,40 +99,25 @@ public class ConverterMain {
final int maxPoolSize = Runtime.getRuntime().availableProcessors();
try (WorkLog processLog = plan.createProcessWorkLog();
ConversionLog log = new ConversionLog(plan.process.getDir())) {
var instructionWriter = new InstructionWriterFactory(log, plan.process.getDir(), gson);
try (BatchingWorkLog batchingWorkLog = new BatchingWorkLogImpl(plan.process.getLogFile());
ConverterWriter converterWriter = new ConverterWriter(batchingWorkLog, plan.process.getDir()))
{
var pool = new DumbThreadPool(maxPoolSize, 2);
int totalDomains = plan.countCrawledDomains();
AtomicInteger processedDomains = new AtomicInteger(0);
// Advance the progress bar to the current position if this is a resumption
processedDomains.set(processLog.countFinishedJobs());
processedDomains.set(batchingWorkLog.size());
heartbeat.setProgress(processedDomains.get() / (double) totalDomains);
for (var domain : plan.crawlDataIterable(id -> !processLog.isJobFinished(id)))
for (var domain : plan.crawlDataIterable(id -> !batchingWorkLog.isItemProcessed(id)))
{
pool.submit(() -> {
try {
ProcessedDomain processed = processor.process(domain);
converterWriter.accept(processed);
final String where;
final int size;
try (var writer = instructionWriter.createInstructionsForDomainWriter(processed.id)) {
compiler.compile(processed, writer::accept);
where = writer.getFileName();
size = writer.getSize();
}
processLog.setJobToFinished(processed.id, where, size);
heartbeat.setProgress(processedDomains.incrementAndGet() / (double) totalDomains);
}
catch (IOException ex) {
logger.warn("IO exception in converter", ex);
}
});
}

View File

@ -1,141 +0,0 @@
package nu.marginalia.converting;
import com.github.luben.zstd.ZstdOutputStream;
import com.google.gson.Gson;
import nu.marginalia.model.crawl.DomainIndexingState;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.keyword.model.DocumentKeywords;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
public class InstructionWriterFactory {
private final ConversionLog log;
private final Path outputDir;
private final Gson gson;
private static final Logger logger = LoggerFactory.getLogger(InstructionWriterFactory.class);
public InstructionWriterFactory(ConversionLog log, Path outputDir, Gson gson) {
this.log = log;
this.outputDir = outputDir;
this.gson = gson;
if (!Files.isDirectory(outputDir)) {
throw new IllegalArgumentException("Output dir " + outputDir + " does not exist");
}
}
public InstructionWriter createInstructionsForDomainWriter(String id) throws IOException {
Path outputFile = getOutputFile(id);
return new InstructionWriter(outputFile);
}
public class InstructionWriter implements AutoCloseable {
private final ObjectOutputStream outputStream;
private final String where;
private final SummarizingInterpreter summary = new SummarizingInterpreter();
private int size = 0;
InstructionWriter(Path filename) throws IOException {
where = filename.getFileName().toString();
Files.deleteIfExists(filename);
outputStream = new ObjectOutputStream(new ZstdOutputStream(new FileOutputStream(filename.toFile())));
}
public void accept(Instruction instruction) {
if (instruction.isNoOp()) return;
instruction.apply(summary);
instruction.apply(log);
size++;
try {
outputStream.writeObject(instruction);
// Reset the stream to avoid keeping references to the objects
// (as this will cause the memory usage to grow indefinitely when
// writing huge amounts of data)
outputStream.reset();
}
catch (IOException ex) {
logger.warn("IO exception writing instruction", ex);
}
}
@Override
public void close() throws IOException {
logger.info("Wrote {} - {} - {}", where, size, summary);
outputStream.close();
}
public String getFileName() {
return where;
}
public int getSize() {
return size;
}
}
private Path getOutputFile(String id) throws IOException {
String first = id.substring(0, 2);
String second = id.substring(2, 4);
Path destDir = outputDir.resolve(first).resolve(second);
if (!Files.exists(destDir)) {
Files.createDirectories(destDir);
}
return destDir.resolve(id + ".pzstd");
}
private static class SummarizingInterpreter implements Interpreter {
private String domainName;
private int ok = 0;
private int error = 0;
int keywords = 0;
int documents = 0;
public String toString() {
// This shouldn't happen (TM)
assert keywords == documents : "keywords != documents";
return String.format("%s - %d %d", domainName, ok, error);
}
@Override
public void loadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip) {
this.domainName = domain.toString();
}
@Override
public void loadProcessedDocument(LoadProcessedDocument loadProcessedDocument) {
documents++;
}
@Override
public void loadKeywords(EdgeUrl url, int ordinal, int features, DocumentMetadata metadata, DocumentKeywords words) {
keywords++;
}
@Override
public void loadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls) {
ok += goodUrls;
error += visitedUrls - goodUrls;
}
}
}

View File

@ -1,59 +0,0 @@
package nu.marginalia.converting.compiler;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.instructions.LoadKeywords;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.model.crawl.HtmlFeature;
import java.util.List;
import java.util.function.Consumer;
public class DocumentsCompiler {
public void compileDocumentDetails(Consumer<Instruction> instructionConsumer,
ProcessedDocument doc,
int ordinal) {
var details = doc.details;
if (details != null) {
instructionConsumer.accept(new LoadProcessedDocument(doc.url,
ordinal,
doc.state,
details.title,
details.description,
HtmlFeature.encode(details.features),
details.standard.name(),
details.length,
details.hashCode,
details.quality,
details.pubYear
));
}
else {
instructionConsumer.accept(new LoadProcessedDocumentWithError(
doc.url,
doc.state,
doc.stateReason,
ordinal
));
}
}
public void compileWords(Consumer<Instruction> instructionConsumer,
ProcessedDocument doc,
int ordinal) {
var words = doc.words;
if (words != null) {
instructionConsumer.accept(new LoadKeywords(doc.url,
ordinal,
HtmlFeature.encode(doc.details.features),
doc.details.metadata,
words.build())
);
}
}
}

View File

@ -1,47 +0,0 @@
package nu.marginalia.converting.compiler;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.instructions.LoadDomainMetadata;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import org.jetbrains.annotations.NotNull;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
public class DomainMetadataCompiler {
public void compile(Consumer<Instruction> instructionConsumer, EdgeDomain domain, @NotNull List<ProcessedDocument> documents) {
int visitedUrls = 0;
int goodUrls = 0;
Set<EdgeUrl> knownUrls = new HashSet<>(documents.size() * 2);
for (var doc : documents) {
visitedUrls++;
if (doc.isOk()) {
goodUrls++;
}
knownUrls.add(doc.url);
Optional.ofNullable(doc.details)
.map(details -> details.linksInternal)
.ifPresent(knownUrls::addAll);
}
instructionConsumer.accept(new LoadDomainMetadata(domain, knownUrls.size(), goodUrls, visitedUrls));
}
public void compileFake(Consumer<Instruction> instructionConsumer, EdgeDomain domain, int countAll, int countGood) {
instructionConsumer.accept(new LoadDomainMetadata(domain, countAll, countGood, countAll));
}
}

View File

@ -1,24 +0,0 @@
package nu.marginalia.converting.compiler;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.instructions.LoadRssFeed;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.model.EdgeUrl;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
public class FeedsCompiler {
public void compile(Consumer<Instruction> instructionConsumer, List<ProcessedDocument> documents) {
EdgeUrl[] feeds = documents.stream().map(doc -> doc.details)
.filter(Objects::nonNull)
.flatMap(dets -> dets.feedLinks.stream())
.distinct()
.toArray(EdgeUrl[]::new);
instructionConsumer.accept(new LoadRssFeed(feeds));
}
}

View File

@ -1,88 +0,0 @@
package nu.marginalia.converting.compiler;
import com.google.inject.Inject;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDomain;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.sideload.SideloadSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Iterator;
import java.util.function.Consumer;
import static java.util.Objects.requireNonNullElse;
public class InstructionsCompiler {
private final DocumentsCompiler documentsCompiler;
private final DomainMetadataCompiler domainMetadataCompiler;
private final FeedsCompiler feedsCompiler;
private final LinksCompiler linksCompiler;
private final RedirectCompiler redirectCompiler;
private final Logger logger = LoggerFactory.getLogger(InstructionsCompiler.class);
@Inject
public InstructionsCompiler(DocumentsCompiler documentsCompiler,
DomainMetadataCompiler domainMetadataCompiler,
FeedsCompiler feedsCompiler,
LinksCompiler linksCompiler,
RedirectCompiler redirectCompiler)
{
this.documentsCompiler = documentsCompiler;
this.domainMetadataCompiler = domainMetadataCompiler;
this.feedsCompiler = feedsCompiler;
this.linksCompiler = linksCompiler;
this.redirectCompiler = redirectCompiler;
}
public void compile(ProcessedDomain domain, Consumer<Instruction> instructionConsumer) {
// Guaranteed to always be first
instructionConsumer.accept(new LoadProcessedDomain(domain.domain, domain.state, domain.ip));
if (domain.documents != null) {
int ordinal = 0;
for (var doc : domain.documents) {
documentsCompiler.compileDocumentDetails(instructionConsumer, doc, ordinal);
documentsCompiler.compileWords(instructionConsumer, doc, ordinal);
ordinal++;
}
feedsCompiler.compile(instructionConsumer, domain.documents);
linksCompiler.compile(instructionConsumer, domain.domain, domain.documents);
}
if (domain.redirect != null) {
redirectCompiler.compile(instructionConsumer, domain.domain, domain.redirect);
}
domainMetadataCompiler.compile(instructionConsumer, domain.domain, requireNonNullElse(domain.documents, Collections.emptyList()));
}
public void compileStreaming(SideloadSource sideloadSource,
Consumer<Instruction> instructionConsumer) {
ProcessedDomain domain = sideloadSource.getDomain();
Iterator<ProcessedDocument> documentsIterator = sideloadSource.getDocumentsStream();
// Guaranteed to always be first
instructionConsumer.accept(new LoadProcessedDomain(domain.domain, domain.state, domain.ip));
int countAll = 0;
int countGood = 0;
logger.info("Writing docs");
while (documentsIterator.hasNext()) {
var doc = documentsIterator.next();
countAll++;
if (doc.isOk()) countGood++;
documentsCompiler.compileDocumentDetails(instructionConsumer, doc, countAll);
documentsCompiler.compileWords(instructionConsumer, doc, countAll);
}
domainMetadataCompiler.compileFake(instructionConsumer, domain.domain, countAll, countGood);
}
}

View File

@ -1,35 +0,0 @@
package nu.marginalia.converting.compiler;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.instructions.DomainLink;
import nu.marginalia.converting.instruction.instructions.LoadDomain;
import nu.marginalia.converting.instruction.instructions.LoadDomainLink;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.model.EdgeDomain;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;
public class LinksCompiler {
public void compile(Consumer<Instruction> instructionConsumer,
EdgeDomain from,
List<ProcessedDocument> documents) {
EdgeDomain[] domains = documents.stream()
.map(doc -> doc.details)
.filter(Objects::nonNull)
.flatMap(details -> details.linksExternal.stream())
.map(link -> link.domain)
.distinct()
.toArray(EdgeDomain[]::new);
DomainLink[] links = new DomainLink[domains.length];
Arrays.setAll(links, i -> new DomainLink(from, domains[i]));
instructionConsumer.accept(new LoadDomain(domains));
instructionConsumer.accept(new LoadDomainLink(links));
}
}

View File

@ -1,20 +0,0 @@
package nu.marginalia.converting.compiler;
import nu.marginalia.converting.instruction.Instruction;
import nu.marginalia.converting.instruction.instructions.DomainLink;
import nu.marginalia.converting.instruction.instructions.LoadDomain;
import nu.marginalia.converting.instruction.instructions.LoadDomainLink;
import nu.marginalia.converting.instruction.instructions.LoadDomainRedirect;
import nu.marginalia.model.EdgeDomain;
import java.util.List;
import java.util.function.Consumer;
public class RedirectCompiler {
public void compile(Consumer<Instruction> instructionConsumer, EdgeDomain from, EdgeDomain to) {
instructionConsumer.accept(new LoadDomain(to));
instructionConsumer.accept(new LoadDomainLink(new DomainLink(from, to)));
instructionConsumer.accept(new LoadDomainRedirect(new DomainLink(from, to)));
}
}

View File

@ -2,6 +2,7 @@ package nu.marginalia.converting.model;
import lombok.ToString;
import nu.marginalia.model.crawl.HtmlFeature;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.model.EdgeUrl;

View File

@ -2,14 +2,13 @@ package nu.marginalia.converting.processor.logic;
import crawlercommons.utils.Strings;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.converting.model.DisqualifiedException;
import nu.marginalia.model.crawl.HtmlFeature;
import org.jetbrains.annotations.NotNull;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.Element;
import org.jsoup.nodes.Node;
import org.jsoup.nodes.TextNode;
import org.jsoup.select.NodeVisitor;
import java.util.Set;

View File

@ -1,7 +1,7 @@
package nu.marginalia.converting.processor.logic;
import com.google.common.base.Strings;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import org.jsoup.nodes.Document;
import org.jsoup.nodes.DocumentType;
import org.slf4j.Logger;

View File

@ -4,7 +4,6 @@ import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import org.jsoup.nodes.Document;
import java.nio.file.Path;
import java.util.HashSet;
import java.util.Set;

View File

@ -1,10 +1,9 @@
package nu.marginalia.converting.processor.plugin;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.converting.language.LanguageFilter;
import nu.marginalia.language.model.DocumentLanguageData;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.keyword.model.DocumentKeywordsBuilder;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.converting.model.DisqualifiedException;

View File

@ -13,10 +13,9 @@ import nu.marginalia.language.model.DocumentLanguageData;
import nu.marginalia.model.crawl.HtmlFeature;
import nu.marginalia.link_parser.LinkParser;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.keyword.DocumentKeywordExtractor;
import nu.marginalia.language.sentence.SentenceExtractor;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.idx.DocumentFlags;
import nu.marginalia.keyword.model.DocumentKeywordsBuilder;
import nu.marginalia.model.idx.DocumentMetadata;

View File

@ -5,10 +5,9 @@ import com.google.inject.name.Named;
import nu.marginalia.converting.language.LanguageFilter;
import nu.marginalia.converting.processor.logic.DocumentLengthLogic;
import nu.marginalia.crawling.model.CrawledDocument;
import nu.marginalia.crawling.model.CrawledDomain;
import nu.marginalia.keyword.DocumentKeywordExtractor;
import nu.marginalia.language.sentence.SentenceExtractor;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.idx.DocumentFlags;
import nu.marginalia.keyword.model.DocumentKeywordsBuilder;
import nu.marginalia.model.idx.DocumentMetadata;

View File

@ -10,6 +10,7 @@ import nu.marginalia.model.crawl.DomainIndexingState;
import nu.marginalia.model.crawl.HtmlFeature;
import nu.marginalia.model.crawl.PubDate;
import nu.marginalia.model.crawl.UrlIndexingState;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.model.idx.DocumentFlags;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.model.idx.WordFlags;

View File

@ -0,0 +1,254 @@
package nu.marginalia.converting.writer;
import gnu.trove.list.TLongList;
import gnu.trove.list.array.TLongArrayList;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.converting.sideload.SideloadSource;
import nu.marginalia.io.processed.DocumentRecordParquetFileWriter;
import nu.marginalia.io.processed.DomainLinkRecordParquetFileWriter;
import nu.marginalia.io.processed.DomainRecordParquetFileWriter;
import nu.marginalia.io.processed.ProcessedDataFileNames;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawl.DomainIndexingState;
import nu.marginalia.model.crawl.HtmlFeature;
import nu.marginalia.model.processed.DocumentRecord;
import nu.marginalia.model.processed.DomainLinkRecord;
import nu.marginalia.model.processed.DomainRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.util.*;
import java.util.concurrent.Callable;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
/** Writer for a single batch of converter parquet files */
public class ConverterBatchWriter implements AutoCloseable {
private final DomainRecordParquetFileWriter domainWriter;
private final DomainLinkRecordParquetFileWriter domainLinkWriter;
private final DocumentRecordParquetFileWriter documentWriter;
private static final Logger logger = LoggerFactory.getLogger(ConverterBatchWriter.class);
public ConverterBatchWriter(Path basePath, int batchNumber) throws IOException {
domainWriter = new DomainRecordParquetFileWriter(
ProcessedDataFileNames.domainFileName(basePath, batchNumber)
);
domainLinkWriter = new DomainLinkRecordParquetFileWriter(
ProcessedDataFileNames.domainLinkFileName(basePath, batchNumber)
);
documentWriter = new DocumentRecordParquetFileWriter(
ProcessedDataFileNames.documentFileName(basePath, batchNumber)
);
}
public void write(SideloadSource sideloadSource) throws IOException {
var domain = sideloadSource.getDomain();
writeDomainData(domain);
writeDocumentData(domain.domain, sideloadSource.getDocumentsStream());
}
public void write(ProcessedDomain domain) {
var results = ForkJoinPool.commonPool().invokeAll(
writeTasks(domain)
);
for (var result : results) {
if (result.state() == Future.State.FAILED) {
logger.warn("Parquet writing job failed", result.exceptionNow());
}
}
}
private List<Callable<Object>> writeTasks(ProcessedDomain domain) {
return List.of(
() -> writeDocumentData(domain),
() -> writeLinkData(domain),
() -> writeDomainData(domain)
);
}
private Object writeDocumentData(ProcessedDomain domain) throws IOException {
if (domain.documents == null)
return this;
writeDocumentData(domain.domain, domain.documents.iterator());
return this;
}
private void writeDocumentData(EdgeDomain domain,
Iterator<ProcessedDocument> documentIterator)
throws IOException
{
int ordinal = 0;
String domainName = domain.toString();
while (documentIterator.hasNext()) {
var document = documentIterator.next();
if (document.details == null) {
new DocumentRecord(
domainName,
document.url.toString(),
ordinal,
document.state.toString(),
document.stateReason,
null,
null,
0,
null,
0,
0L,
-15,
0L,
null,
null,
null);
}
else {
var wb = document.words.build();
List<String> words = Arrays.asList(wb.keywords);
TLongList metas = new TLongArrayList(wb.metadata);
documentWriter.write(new DocumentRecord(
domainName,
document.url.toString(),
ordinal,
document.state.toString(),
document.stateReason,
document.details.title,
document.details.description,
HtmlFeature.encode(document.details.features),
document.details.standard.name(),
document.details.length,
document.details.hashCode,
(float) document.details.quality,
document.details.metadata.encode(),
document.details.pubYear,
words,
metas
));
}
ordinal++;
}
}
private Object writeLinkData(ProcessedDomain domain) throws IOException {
String from = domain.domain.toString();
if (domain.documents == null)
return this;
Set<EdgeDomain> seen = new HashSet<>();
for (var doc : domain.documents) {
if (doc.details == null)
continue;
for (var link : doc.details.linksExternal) {
var dest = link.domain;
if (!seen.add(dest)) {
continue;
}
domainLinkWriter.write(new DomainLinkRecord(
from,
dest.toString()
));
}
}
if (domain.redirect != null) {
domainLinkWriter.write(new DomainLinkRecord(
from,
domain.redirect.toString()
));
}
return this;
}
private Object writeDomainData(ProcessedDomain domain) throws IOException {
DomainMetadata metadata = DomainMetadata.from(domain);
List<String> feeds = getFeedUrls(domain);
domainWriter.write(
new DomainRecord(
domain.domain.toString(),
metadata.known(),
metadata.good(),
metadata.visited(),
Optional.ofNullable(domain.state).map(DomainIndexingState::toString).orElse(null),
Optional.ofNullable(domain.redirect).map(EdgeDomain::toString).orElse(null),
domain.ip,
feeds
)
);
return this;
}
private List<String> getFeedUrls(ProcessedDomain domain) {
var documents = domain.documents;
if (documents == null)
return List.of();
return documents.stream().map(doc -> doc.details)
.filter(Objects::nonNull)
.flatMap(dets -> dets.feedLinks.stream())
.distinct()
.map(EdgeUrl::toString)
.toList();
}
public void close() throws IOException {
domainWriter.close();
documentWriter.close();
domainLinkWriter.close();
}
}
record DomainMetadata(int known, int good, int visited) {
public static DomainMetadata from(ProcessedDomain domain) {
var documents = domain.documents;
if (documents == null) {
return new DomainMetadata(0, 0, 0);
}
int visitedUrls = 0;
int goodUrls = 0;
Set<EdgeUrl> knownUrls = new HashSet<>();
for (var doc : documents) {
visitedUrls++;
if (doc.isOk()) {
goodUrls++;
}
knownUrls.add(doc.url);
Optional.ofNullable(doc.details)
.map(details -> details.linksInternal)
.ifPresent(knownUrls::addAll);
}
return new DomainMetadata(knownUrls.size(), goodUrls, visitedUrls);
}
}

View File

@ -0,0 +1,139 @@
package nu.marginalia.converting.writer;
import lombok.SneakyThrows;
import nu.marginalia.converting.model.ProcessedDomain;
import nu.marginalia.worklog.BatchingWorkLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
public class ConverterWriter implements AutoCloseable {
private static final Logger logger = LoggerFactory.getLogger(ConverterWriter.class);
private final BatchingWorkLog workLog;
private final Path basePath;
private final Duration switchInterval
= Duration.of(10, ChronoUnit.MINUTES);
private final ArrayBlockingQueue<ProcessedDomain> domainData
= new ArrayBlockingQueue<>(4);
private final Thread workerThread;
private ConverterBatchWriter currentWriter;
volatile boolean running = true;
public ConverterWriter(BatchingWorkLog workLog, Path basePath) {
this.workLog = workLog;
this.basePath = basePath;
workerThread = new Thread(this::writerThread, getClass().getSimpleName());
workerThread.start();
}
@SneakyThrows
public void accept(ProcessedDomain domain) {
domainData.put(domain);
}
@SneakyThrows
private void writerThread() {
IntervalAction switcher = new IntervalAction(this::switchBatch, switchInterval);
currentWriter = new ConverterBatchWriter(basePath, workLog.getBatchNumber());
while (running || !domainData.isEmpty()) {
// poll with a timeout so we have an
// opportunity to check the running condition
// ... we could interrupt the thread as well, but
// as we enter third party code it's difficult to guarantee it will deal
// well with being interrupted
var data = domainData.poll(1, TimeUnit.SECONDS);
if (data == null)
continue;
String id = data.id;
if (workLog.isItemCommitted(id) || workLog.isItemInCurrentBatch(id)) {
logger.warn("Skipping already logged item {}", id);
continue;
}
currentWriter.write(data);
workLog.logItem(id);
switcher.tick();
}
}
@SneakyThrows
public boolean switchBatch() {
if (workLog.isCurrentBatchEmpty()) {
// Nothing to commit
return false;
}
// order matters here
currentWriter.close();
workLog.logFinishedBatch();
logger.info("Switching to batch {}", workLog.getBatchNumber());
currentWriter = new ConverterBatchWriter(basePath, workLog.getBatchNumber());
return true;
}
@Override
public void close() throws Exception {
running = false;
workerThread.join();
// order matters here
currentWriter.close();
workLog.logFinishedBatch();
}
}
class IntervalAction {
private final Callable<Boolean> action;
private final Duration interval;
private Instant nextActionInstant;
IntervalAction(Callable<Boolean> action, Duration interval) {
this.action = action;
this.interval = interval;
}
/** Execute the provided action if enough time has passed
* since the last successful invocation */
public void tick() {
var now = Instant.now();
if (nextActionInstant == null) {
nextActionInstant = now.plus(interval);
return;
}
try {
if (now.isAfter(nextActionInstant)
&& action.call())
{
nextActionInstant = now.plus(interval);
}
}
catch (Exception ex) {
throw new RuntimeException(ex);
}
}
}

View File

@ -3,7 +3,7 @@ package nu.marginalia.converting;
import com.google.inject.Guice;
import com.google.inject.Injector;
import nu.marginalia.converting.model.HtmlStandard;
import nu.marginalia.model.html.HtmlStandard;
import nu.marginalia.converting.model.ProcessedDocument;
import nu.marginalia.converting.processor.DomainProcessor;
import nu.marginalia.crawling.io.SerializableCrawlDataStream;

View File

@ -34,7 +34,6 @@ dependencies {
implementation project(':code:libraries:language-processing')
implementation project(':code:libraries:easy-lsh')
implementation project(':code:process-models:crawling-model')
implementation project(':code:process-models:converting-model')
implementation project(':code:features-crawl:crawl-blocklist')
implementation project(':code:features-crawl:link-parser')

View File

@ -35,7 +35,8 @@ dependencies {
testImplementation project(':code:services-core:search-service')
implementation project(':code:process-models:crawling-model')
implementation project(':code:process-models:converting-model')
implementation project(':code:process-models:processed-data')
implementation project(':code:process-models:work-log')
implementation project(':code:features-convert:keyword-extraction')

View File

@ -1,109 +0,0 @@
package nu.marginalia.loading;
import com.github.luben.zstd.RecyclingBufferPool;
import com.github.luben.zstd.ZstdInputStream;
import lombok.SneakyThrows;
import nu.marginalia.converting.instruction.Instruction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.lang.ref.Cleaner;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
public class ConvertedDomainReader {
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
private static final Logger logger = LoggerFactory.getLogger(ConvertedDomainReader.class);
/** Creates a new iterator over Path. The implementation will try to read the file in a separate thread, and
* will block until the first instruction is available. Iterator$hasNext may block.
*/
public Iterator<Instruction> createIterator(Path path) {
return new PrefetchingInstructionIterator(path);
}
class PrefetchingInstructionIterator implements Iterator<Instruction> {
private final LinkedBlockingQueue<Instruction> queue = new LinkedBlockingQueue<>(16);
private final AtomicBoolean finished = new AtomicBoolean(false);
private Instruction next = null;
private final static Cleaner cleaner = Cleaner.create();
static class CancelAction implements Runnable {
private final Future<?> future;
public CancelAction(Future<Object> taskFuture) {
this.future = taskFuture;
}
public void run() {
future.cancel(true);
}
}
public PrefetchingInstructionIterator(Path path) {
var taskFuture = executorService.submit(() -> readerThread(path));
// Cancel the future if the iterator is garbage collected
// to reduce the risk of leaking resources; as the worker thread
// will spin forever on put if the queue is full.
cleaner.register(this, new CancelAction(taskFuture));
}
private Object readerThread(Path path) {
try (var or = new ObjectInputStream(new ZstdInputStream(new BufferedInputStream(new FileInputStream(path.toFile())), RecyclingBufferPool.INSTANCE))) {
for (;;) {
var nextObject = or.readObject();
if (nextObject instanceof Instruction is) {
queue.put(is);
} else {
logger.warn("Spurious object in file: {}", nextObject.getClass().getSimpleName());
}
}
} catch (EOFException ex) {
// Expected
return null;
} catch (ClassNotFoundException | IOException | InterruptedException e) {
logger.warn("Error reading file " + path, e);
throw new RuntimeException(e);
} finally {
finished.set(true);
}
}
@SneakyThrows
@Override
public boolean hasNext() {
if (next != null)
return true;
// As long as the worker is still running, we'll do a blocking poll to wait for the next instruction
// (but we wake up every second to check if the worker is still running)
while (!finished.get()) {
if (null != (next = queue.poll(1, TimeUnit.SECONDS))) {
return true;
}
}
// If the worker is not running, we just drain the queue without waiting
return null != (next = queue.poll());
}
@Override
public Instruction next() {
if (next != null || hasNext()) {
try { return next; }
finally { next = null; }
}
throw new IllegalStateException();
}
}
}

View File

@ -1,4 +1,4 @@
package nu.marginalia.loading.loader;
package nu.marginalia.loading;
import com.google.inject.Inject;
import com.google.inject.Singleton;
@ -44,12 +44,20 @@ public class LoaderIndexJournalWriter {
indexWriter = new IndexJournalWriterPagingImpl(indexArea.asPath());
}
@SneakyThrows
public void putWords(long combinedId,
int features,
DocumentMetadata metadata,
DocumentKeywords wordSet) {
putWords(combinedId, features, metadata.encode(), wordSet);
}
@SneakyThrows
public void putWords(long combinedId,
int features,
long metadata,
DocumentKeywords wordSet) {
if (wordSet.isEmpty()) {
logger.info("Skipping zero-length word set for {}", combinedId);
return;
@ -75,7 +83,7 @@ public class LoaderIndexJournalWriter {
}
var entry = new IndexJournalEntryData(i, buffer);
var header = new IndexJournalEntryHeader(combinedId, features, metadata.encode());
var header = new IndexJournalEntryHeader(combinedId, features, metadata);
indexWriter.put(header, entry);
}

View File

@ -5,29 +5,31 @@ import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Injector;
import lombok.SneakyThrows;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.db.storage.FileStorageService;
import nu.marginalia.keyword.model.DocumentKeywords;
import nu.marginalia.linkdb.LinkdbWriter;
import nu.marginalia.loading.loader.IndexLoadKeywords;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.loading.documents.DocumentLoaderService;
import nu.marginalia.loading.documents.KeywordLoaderService;
import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.loading.domains.DomainLoaderService;
import nu.marginalia.loading.links.DomainLinksLoaderService;
import nu.marginalia.mq.MessageQueueFactory;
import nu.marginalia.mq.MqMessage;
import nu.marginalia.mq.inbox.MqInboxResponse;
import nu.marginalia.mq.inbox.MqSingleShotInbox;
import nu.marginalia.process.control.ProcessHeartbeatImpl;
import nu.marginalia.process.log.WorkLog;
import nu.marginalia.worklog.BatchingWorkLogInspector;
import plan.CrawlPlan;
import nu.marginalia.loading.loader.LoaderFactory;
import nu.marginalia.service.module.DatabaseModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import static nu.marginalia.mqapi.ProcessInboxNames.LOADER_INBOX;
@ -35,14 +37,15 @@ import static nu.marginalia.mqapi.ProcessInboxNames.LOADER_INBOX;
public class LoaderMain {
private static final Logger logger = LoggerFactory.getLogger(LoaderMain.class);
private final ConvertedDomainReader instructionsReader;
private final LoaderFactory loaderFactory;
private final ProcessHeartbeatImpl heartbeat;
private final MessageQueueFactory messageQueueFactory;
private final FileStorageService fileStorageService;
private final IndexLoadKeywords indexLoadKeywords;
private final LinkdbWriter writer;
private final LinkdbWriter linkdbWriter;
private final LoaderIndexJournalWriter journalWriter;
private final DomainLoaderService domainService;
private final DomainLinksLoaderService linksService;
private final KeywordLoaderService keywordLoaderService;
private final DocumentLoaderService documentLoaderService;
private final Gson gson;
public static void main(String... args) throws Exception {
@ -65,87 +68,70 @@ public class LoaderMain {
}
@Inject
public LoaderMain(ConvertedDomainReader instructionsReader,
LoaderFactory loaderFactory,
ProcessHeartbeatImpl heartbeat,
public LoaderMain(ProcessHeartbeatImpl heartbeat,
MessageQueueFactory messageQueueFactory,
FileStorageService fileStorageService,
IndexLoadKeywords indexLoadKeywords,
LinkdbWriter writer,
LinkdbWriter linkdbWriter,
LoaderIndexJournalWriter journalWriter,
DomainLoaderService domainService,
DomainLinksLoaderService linksService,
KeywordLoaderService keywordLoaderService,
DocumentLoaderService documentLoaderService,
Gson gson
) {
this.instructionsReader = instructionsReader;
this.loaderFactory = loaderFactory;
this.heartbeat = heartbeat;
this.messageQueueFactory = messageQueueFactory;
this.fileStorageService = fileStorageService;
this.indexLoadKeywords = indexLoadKeywords;
this.writer = writer;
this.linkdbWriter = linkdbWriter;
this.journalWriter = journalWriter;
this.domainService = domainService;
this.linksService = linksService;
this.keywordLoaderService = keywordLoaderService;
this.documentLoaderService = documentLoaderService;
this.gson = gson;
heartbeat.start();
}
@SneakyThrows
public void run(LoadRequest instructions) {
void run(LoadRequest instructions) {
var plan = instructions.getPlan();
var logFile = plan.process.getLogFile();
var processLogFile = plan.process.getLogFile();
Path inputDataDir = plan.process.getDir();
int validBatchCount = BatchingWorkLogInspector.getValidBatches(processLogFile);
DomainIdRegistry domainIdRegistry =
domainService.getOrCreateDomainIds(
inputDataDir,
validBatchCount);
TaskStats taskStats = new TaskStats(100);
try {
int loadTotal = 0;
int loaded = 0;
var results = ForkJoinPool.commonPool()
.invokeAll(
List.of(
() -> linksService.loadLinks(domainIdRegistry, heartbeat, inputDataDir, validBatchCount),
() -> keywordLoaderService.loadKeywords(domainIdRegistry, heartbeat, inputDataDir, validBatchCount),
() -> documentLoaderService.loadDocuments(domainIdRegistry, heartbeat, inputDataDir, validBatchCount)
)
);
for (var unused : WorkLog.iterable(logFile)) {
loadTotal++;
for (var result : results) {
if (result.state() == Future.State.FAILED) {
throw result.exceptionNow();
}
logger.info("Loading {} files", loadTotal);
for (var entry : WorkLog.iterable(logFile)) {
InstructionCounter instructionCounter = new InstructionCounter();
heartbeat.setProgress(loaded++ / (double) loadTotal);
long startTime = System.currentTimeMillis();
Path destDir = plan.getProcessedFilePath(entry.path());
try (var loader = loaderFactory.create(entry.cnt())) {
var instructionsIter = instructionsReader.createIterator(destDir);
while (instructionsIter.hasNext()) {
var next = instructionsIter.next();
try {
next.apply(instructionCounter);
next.apply(loader);
} catch (Exception ex) {
logger.error("Failed to load instruction " + next.getClass().getSimpleName(), ex);
}
}
}
long endTime = System.currentTimeMillis();
long loadTime = endTime - startTime;
taskStats.observe(endTime - startTime);
logger.info("Loaded {}/{} : {} ({}) {}ms {} l/s", taskStats.getCount(),
loadTotal, destDir, instructionCounter.getCount(), loadTime, taskStats.avgTime());
}
instructions.ok();
// This needs to be done in order to have a readable index journal
indexLoadKeywords.close();
writer.close();
logger.info("Loading finished");
}
catch (Exception ex) {
ex.printStackTrace();
logger.error("Failed to load", ex);
instructions.err();
throw ex;
logger.error("Error", ex);
}
finally {
journalWriter.close();
linkdbWriter.close();
heartbeat.shutDown();
}
@ -213,15 +199,4 @@ public class LoaderMain {
}
}
public class InstructionCounter implements Interpreter {
private int count = 0;
public void loadKeywords(EdgeUrl url, int ordinal, int features, DocumentMetadata metadata, DocumentKeywords words) {
count++;
}
public int getCount() {
return count;
}
}
}

View File

@ -1,37 +0,0 @@
package nu.marginalia.loading;
public class TaskStats {
private final long[] taskTimes;
private int count = 0;
private long total = 0;
public TaskStats(int windowSize) {
taskTimes = new long[windowSize];
}
public synchronized void observe(long time) {
taskTimes[count++%taskTimes.length] = time;
total += time;
}
public double avgTime() {
long tts = 0;
long tot;
if (count < taskTimes.length) tot = count;
else tot = taskTimes.length;
for (int i = 0; i < tot; i++) tts += taskTimes[i];
return (tot * 10_000L / tts)/10.;
}
public double totalTime() {
return total;
}
public int getCount() {
return count;
}
}

View File

@ -0,0 +1,119 @@
package nu.marginalia.loading.documents;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import lombok.SneakyThrows;
import nu.marginalia.io.processed.DocumentRecordParquetFileReader;
import nu.marginalia.io.processed.ProcessedDataFileNames;
import nu.marginalia.linkdb.LinkdbWriter;
import nu.marginalia.linkdb.model.LdbUrlDetail;
import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.model.processed.DocumentRecordMetadataProjection;
import nu.marginalia.process.control.ProcessHeartbeat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
@Singleton
public class DocumentLoaderService {
private static final Logger logger = LoggerFactory.getLogger(DocumentLoaderService.class);
private final LinkdbWriter linkdbWriter;
@Inject
public DocumentLoaderService(LinkdbWriter linkdbWriter) {
this.linkdbWriter = linkdbWriter;
}
public boolean loadDocuments(
DomainIdRegistry domainIdRegistry,
ProcessHeartbeat processHeartbeat,
Path processedDataPathBase,
int untilBatch)
throws IOException, SQLException
{
var documentFiles = ProcessedDataFileNames.listDocumentFiles(processedDataPathBase, untilBatch);
try (var taskHeartbeat = processHeartbeat.createAdHocTaskHeartbeat("DOCUMENTS")) {
int processed = 0;
for (var file : documentFiles) {
taskHeartbeat.progress("LOAD", processed++, documentFiles.size());
loadDocumentsFromFile(domainIdRegistry, file);
}
taskHeartbeat.progress("LOAD", processed, documentFiles.size());
}
logger.info("Finished");
return true;
}
private void loadDocumentsFromFile(DomainIdRegistry domainIdRegistry, Path file)
throws SQLException, IOException
{
try (var stream = DocumentRecordParquetFileReader.streamMetadataProjection(file);
LinkdbLoader loader = new LinkdbLoader(domainIdRegistry)
)
{
logger.info("Loading document meta from {}", file);
stream.forEach(loader::accept);
}
}
class LinkdbLoader implements AutoCloseable {
private final DomainIdRegistry domainIdRegistry;
private final List<LdbUrlDetail> details = new ArrayList<>(1000);
LinkdbLoader(DomainIdRegistry domainIdRegistry) {
this.domainIdRegistry = domainIdRegistry;
}
@SneakyThrows
public void accept(DocumentRecordMetadataProjection projection)
{
long urlId = UrlIdCodec.encodeId(
domainIdRegistry.getDomainId(projection.domain),
projection.ordinal
);
details.add(new LdbUrlDetail(
urlId,
new EdgeUrl(projection.url),
projection.title,
projection.description,
projection.quality,
projection.htmlStandard,
projection.htmlFeatures,
projection.pubYear,
projection.hash,
projection.getLength()
));
if (details.size() > 100) {
linkdbWriter.add(details);
details.clear();
}
}
@Override
public void close() throws SQLException {
if (!details.isEmpty()) {
linkdbWriter.add(details);
}
}
}
}

View File

@ -0,0 +1,78 @@
package nu.marginalia.loading.documents;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import nu.marginalia.io.processed.DocumentRecordParquetFileReader;
import nu.marginalia.io.processed.ProcessedDataFileNames;
import nu.marginalia.keyword.model.DocumentKeywords;
import nu.marginalia.loading.LoaderIndexJournalWriter;
import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.model.processed.DocumentRecordKeywordsProjection;
import nu.marginalia.process.control.ProcessHeartbeat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
@Singleton
public class KeywordLoaderService {
private static final Logger logger = LoggerFactory.getLogger(KeywordLoaderService.class);
private final LoaderIndexJournalWriter writer;
@Inject
public KeywordLoaderService(LoaderIndexJournalWriter writer) {
this.writer = writer;
}
public boolean loadKeywords(DomainIdRegistry domainIdRegistry,
ProcessHeartbeat heartbeat,
Path processedDataPathBase,
int untilBatch) throws IOException {
try (var task = heartbeat.createAdHocTaskHeartbeat("KEYWORDS")) {
var documentFiles = ProcessedDataFileNames.listDocumentFiles(processedDataPathBase, untilBatch);
int processed = 0;
for (var file : documentFiles) {
task.progress("LOAD", processed++, documentFiles.size());
loadKeywordsFromFile(domainIdRegistry, file);
}
task.progress("LOAD", processed, documentFiles.size());
}
logger.info("Finished");
return true;
}
private void loadKeywordsFromFile(DomainIdRegistry domainIdRegistry, Path file) throws IOException {
try (var stream = DocumentRecordParquetFileReader.streamKeywordsProjection(file)) {
logger.info("Loading keywords from {}", file);
stream.filter(DocumentRecordKeywordsProjection::hasKeywords)
.forEach(proj -> insertKeywords(domainIdRegistry, proj));
}
}
private void insertKeywords(DomainIdRegistry domainIdRegistry,
DocumentRecordKeywordsProjection projection)
{
long combinedId = UrlIdCodec.encodeId(
domainIdRegistry.getDomainId(projection.domain),
projection.ordinal);
var words = new DocumentKeywords(
projection.words.toArray(String[]::new),
projection.metas.toArray()
);
writer.putWords(combinedId,
projection.htmlFeatures,
projection.documentMetadata,
words);
}
}

View File

@ -0,0 +1,25 @@
package nu.marginalia.loading.domains;
import java.util.HashMap;
import java.util.Map;
/** Maps domain names to domain ids */
public class DomainIdRegistry {
private final Map<String, Integer> domainIds = new HashMap<>();
public int getDomainId(String domainName) {
Integer id = domainIds.get(domainName.toLowerCase());
if (id == null) {
// This is a very severe problem
throw new IllegalStateException("Unknown domain id for domain " + domainName);
}
return id;
}
void add(String domainName, int id) {
domainIds.put(domainName, id);
}
}

View File

@ -0,0 +1,97 @@
package nu.marginalia.loading.domains;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.io.processed.DomainLinkRecordParquetFileReader;
import nu.marginalia.io.processed.DomainRecordParquetFileReader;
import nu.marginalia.io.processed.ProcessedDataFileNames;
import nu.marginalia.model.EdgeDomain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.*;
@Singleton
public class DomainLoaderService {
private final HikariDataSource dataSource;
private final Logger logger = LoggerFactory.getLogger(DomainLoaderService.class);
@Inject
public DomainLoaderService(HikariDataSource dataSource) {
this.dataSource = dataSource;
}
/** Read the domain names from each parquet file
* compare with SQL domain database, fetch those
* that exist, insert those that don't.
*/
public DomainIdRegistry getOrCreateDomainIds(Path processedDataPathBase, int untilBatch)
throws IOException, SQLException
{
Collection<String> domainNamesAll = readDomainNames(processedDataPathBase, untilBatch);
return getDatabaseIds(domainNamesAll);
}
Collection<String> readDomainNames(Path processedDataPathBase, int untilBatch) throws IOException {
final Set<String> domainNamesAll = new HashSet<>(100_000);
var domainFiles = ProcessedDataFileNames.listDomainFiles(processedDataPathBase, untilBatch);
for (var file : domainFiles) {
domainNamesAll.addAll(DomainRecordParquetFileReader.getDomainNames(file));
}
var linkFiles = ProcessedDataFileNames.listDomainLinkFiles(processedDataPathBase, untilBatch);
for (var file : linkFiles) {
domainNamesAll.addAll(DomainLinkRecordParquetFileReader.getDestDomainNames(file));
}
return domainNamesAll;
}
DomainIdRegistry getDatabaseIds(Collection<String> domainNamesAll) throws SQLException {
DomainIdRegistry ret = new DomainIdRegistry();
try (var conn = dataSource.getConnection();
var insertStmt = conn.prepareStatement("""
INSERT IGNORE INTO EC_DOMAIN (DOMAIN_NAME, DOMAIN_TOP) VALUES (?, ?)
""");
var selectStmt = conn.prepareStatement("""
SELECT ID, DOMAIN_NAME FROM EC_DOMAIN WHERE DOMAIN_NAME=?
""")
) {
int i = 0;
for (var domain : domainNamesAll) {
var parsed = new EdgeDomain(domain);
insertStmt.setString(1, domain);
insertStmt.setString(2, parsed.domain);
insertStmt.addBatch();
if (++i > 1000) {
i = 0;
insertStmt.executeBatch();
}
}
if (i > 0) {
insertStmt.executeBatch();
}
for (var domain : domainNamesAll) {
selectStmt.setString(1, domain);
var rs = selectStmt.executeQuery();
if (rs.next()) {
ret.add(domain, rs.getInt(1));
}
else {
logger.error("Unknown domain {}", domain);
}
}
}
return ret;
}
}

View File

@ -0,0 +1,120 @@
package nu.marginalia.loading.links;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.io.processed.DomainLinkRecordParquetFileReader;
import nu.marginalia.io.processed.ProcessedDataFileNames;
import nu.marginalia.loading.domains.DomainIdRegistry;
import nu.marginalia.model.processed.DomainLinkRecord;
import nu.marginalia.process.control.ProcessHeartbeat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.file.Path;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
@Singleton
public class DomainLinksLoaderService {
private final HikariDataSource dataSource;
private static final Logger logger = LoggerFactory.getLogger(DomainLinksLoaderService.class);
@Inject
public DomainLinksLoaderService(HikariDataSource dataSource) {
this.dataSource = dataSource;
}
public boolean loadLinks(DomainIdRegistry domainIdRegistry,
ProcessHeartbeat heartbeat,
Path processedDataPathBase,
int untilBatch) throws IOException, SQLException {
dropLinkData();
try (var task = heartbeat.createAdHocTaskHeartbeat("LINKS")) {
var linkFiles = ProcessedDataFileNames.listDomainLinkFiles(processedDataPathBase, untilBatch);
int processed = 0;
for (var file : linkFiles) {
task.progress("LOAD", processed++, linkFiles.size());
loadLinksFromFile(domainIdRegistry, file);
}
task.progress("LOAD", processed, linkFiles.size());
}
logger.info("Finished");
return true;
}
private void dropLinkData() throws SQLException {
logger.info("Truncating EC_DOMAIN_LINK");
try (var conn = dataSource.getConnection();
var stmt = conn.createStatement()) {
stmt.executeUpdate("TRUNCATE TABLE EC_DOMAIN_LINK");
}
}
private void loadLinksFromFile(DomainIdRegistry domainIdRegistry, Path file) throws IOException, SQLException {
try (var domainStream = DomainLinkRecordParquetFileReader.stream(file);
var linkLoader = new LinkLoader(domainIdRegistry))
{
logger.info("Loading links from {}", file);
domainStream.forEach(linkLoader::accept);
}
}
class LinkLoader implements AutoCloseable {
private final Connection connection;
private final PreparedStatement insertStatement;
private final DomainIdRegistry domainIdRegistry;
private int batchSize = 0;
private int total = 0;
public LinkLoader(DomainIdRegistry domainIdRegistry) throws SQLException {
this.domainIdRegistry = domainIdRegistry;
connection = dataSource.getConnection();
insertStatement = connection.prepareStatement("""
INSERT INTO EC_DOMAIN_LINK(SOURCE_DOMAIN_ID, DEST_DOMAIN_ID)
VALUES (?, ?)
""");
}
void accept(DomainLinkRecord record) {
try {
insertStatement.setInt(1, domainIdRegistry.getDomainId(record.source));
insertStatement.setInt(2, domainIdRegistry.getDomainId(record.dest));
insertStatement.addBatch();
if (++batchSize > 1000) {
batchSize = 0;
insertStatement.executeBatch();
}
total++;
}
catch (SQLException ex) {
throw new RuntimeException(ex);
}
}
@Override
public void close() throws SQLException {
if (batchSize > 0) {
insertStatement.executeBatch();
}
logger.info("Inserted {} links", total);
insertStatement.close();
connection.close();
}
}
}

View File

@ -1,47 +0,0 @@
package nu.marginalia.loading.loader;
import com.google.inject.Inject;
import nu.marginalia.keyword.model.DocumentKeywords;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.id.UrlIdCodec;
import nu.marginalia.model.idx.DocumentMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class IndexLoadKeywords {
private static final Logger logger = LoggerFactory.getLogger(IndexLoadKeywords.class);
private final LoaderIndexJournalWriter journalWriter;
private volatile boolean canceled = false;
@Inject
public IndexLoadKeywords(LoaderIndexJournalWriter journalWriter) {
this.journalWriter = journalWriter;
}
public void close() throws Exception {
if (!canceled) {
journalWriter.close();
}
}
public void load(LoaderData loaderData,
int ordinal,
EdgeUrl url,
int features,
DocumentMetadata metadata,
DocumentKeywords words) {
long combinedId = UrlIdCodec.encodeId(loaderData.getTargetDomainId(), ordinal);
if (combinedId <= 0) {
logger.warn("Failed to get IDs for {} -- c={}", url, combinedId);
return;
}
journalWriter.putWords(combinedId,
features,
metadata,
words);
}
}

View File

@ -1,83 +0,0 @@
package nu.marginalia.loading.loader;
import com.google.inject.Inject;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError;
import nu.marginalia.linkdb.LinkdbStatusWriter;
import nu.marginalia.linkdb.LinkdbWriter;
import nu.marginalia.linkdb.model.LdbUrlDetail;
import nu.marginalia.linkdb.model.UrlStatus;
import nu.marginalia.model.id.UrlIdCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class LdbLoadProcessedDocument {
private static final Logger logger = LoggerFactory.getLogger(LdbLoadProcessedDocument.class);
private final LinkdbWriter linkdbWriter;
private final LinkdbStatusWriter linkdbStatusWriter;
@Inject
public LdbLoadProcessedDocument(LinkdbWriter linkdbWriter,
LinkdbStatusWriter linkdbStatusWriter
) {
this.linkdbWriter = linkdbWriter;
this.linkdbStatusWriter = linkdbStatusWriter;
}
public void load(LoaderData data, List<LoadProcessedDocument> documents) {
var details = new ArrayList<LdbUrlDetail>();
int domainId = data.getTargetDomainId();
var statusList = new ArrayList<UrlStatus>();
for (var document : documents) {
long id = UrlIdCodec.encodeId(domainId, document.ordinal());
details.add(new LdbUrlDetail(
id,
document.url(),
document.title(),
document.description(),
document.quality(),
document.standard(),
document.htmlFeatures(),
document.pubYear(),
document.hash(),
document.length()
));
statusList.add(new UrlStatus(id, document.url(), document.state().toString(), null));
}
try {
linkdbWriter.add(details);
}
catch (SQLException ex) {
logger.warn("Failed to add processed documents to linkdb", ex);
}
}
public void loadWithError(LoaderData data, List<LoadProcessedDocumentWithError> documents) {
var statusList = new ArrayList<UrlStatus>();
int domainId = data.getTargetDomainId();
for (var document : documents) {
statusList.add(new UrlStatus(
UrlIdCodec.encodeId(domainId, document.ordinal()),
document.url(),
document.state().toString(),
document.reason()
));
}
try {
linkdbStatusWriter.add(statusList);
}
catch (SQLException ex) {
logger.warn("Failed to add processed documents to linkdb", ex);
}
}
}

View File

@ -1,118 +0,0 @@
package nu.marginalia.loading.loader;
import nu.marginalia.keyword.model.DocumentKeywords;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.EdgeUrl;
import nu.marginalia.model.crawl.DomainIndexingState;
import nu.marginalia.model.idx.DocumentMetadata;
import nu.marginalia.converting.instruction.Interpreter;
import nu.marginalia.converting.instruction.instructions.DomainLink;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocument;
import nu.marginalia.converting.instruction.instructions.LoadProcessedDocumentWithError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
public class Loader implements Interpreter, AutoCloseable {
private final SqlLoadDomains sqlLoadDomains;
private final SqlLoadDomainLinks sqlLoadDomainLinks;
private final SqlLoadProcessedDomain sqlLoadProcessedDomain;
private final LdbLoadProcessedDocument loadProcessedDocument;
private final SqlLoadDomainMetadata sqlLoadDomainMetadata;
private final IndexLoadKeywords indexLoadKeywords;
private static final Logger logger = LoggerFactory.getLogger(Loader.class);
private final List<LoadProcessedDocument> processedDocumentList;
private final List<LoadProcessedDocumentWithError> processedDocumentWithErrorList;
public final LoaderData data;
public Loader(int sizeHint,
SqlLoadDomains sqlLoadDomains,
SqlLoadDomainLinks sqlLoadDomainLinks,
SqlLoadProcessedDomain sqlLoadProcessedDomain,
LdbLoadProcessedDocument loadProcessedDocument,
SqlLoadDomainMetadata sqlLoadDomainMetadata,
IndexLoadKeywords indexLoadKeywords) {
data = new LoaderData(sizeHint);
this.sqlLoadDomains = sqlLoadDomains;
this.sqlLoadDomainLinks = sqlLoadDomainLinks;
this.sqlLoadProcessedDomain = sqlLoadProcessedDomain;
this.loadProcessedDocument = loadProcessedDocument;
this.sqlLoadDomainMetadata = sqlLoadDomainMetadata;
this.indexLoadKeywords = indexLoadKeywords;
processedDocumentList = new ArrayList<>(sizeHint);
processedDocumentWithErrorList = new ArrayList<>(sizeHint);
}
@Override
public void loadDomain(EdgeDomain[] domains) {
sqlLoadDomains.load(data, domains);
}
@Override
public void loadRssFeed(EdgeUrl[] rssFeed) {
logger.debug("loadRssFeed({})", rssFeed, null);
}
@Override
public void loadDomainLink(DomainLink[] links) {
sqlLoadDomainLinks.load(data, links);
}
@Override
public void loadProcessedDomain(EdgeDomain domain, DomainIndexingState state, String ip) {
sqlLoadProcessedDomain.load(data, domain, state, ip);
}
@Override
public void loadProcessedDocument(LoadProcessedDocument document) {
processedDocumentList.add(document);
if (processedDocumentList.size() > 1000) {
loadProcessedDocument.load(data, processedDocumentList);
processedDocumentList.clear();
}
}
@Override
public void loadProcessedDocumentWithError(LoadProcessedDocumentWithError document) {
processedDocumentWithErrorList.add(document);
if (processedDocumentWithErrorList.size() > 1000) {
loadProcessedDocument.loadWithError(data, processedDocumentWithErrorList);
processedDocumentWithErrorList.clear();
}
}
@Override
public void loadKeywords(EdgeUrl url, int ordinal, int features, DocumentMetadata metadata, DocumentKeywords words) {
indexLoadKeywords.load(data, ordinal, url, features, metadata, words);
}
@Override
public void loadDomainRedirect(DomainLink link) {
sqlLoadProcessedDomain.loadAlias(data, link);
}
@Override
public void loadDomainMetadata(EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls) {
sqlLoadDomainMetadata.load(data, domain, knownUrls, goodUrls, visitedUrls);
}
public void close() {
if (processedDocumentList.size() > 0) {
loadProcessedDocument.load(data, processedDocumentList);
}
if (processedDocumentWithErrorList.size() > 0) {
loadProcessedDocument.loadWithError(data, processedDocumentWithErrorList);
}
}
}

View File

@ -1,38 +0,0 @@
package nu.marginalia.loading.loader;
import gnu.trove.map.hash.TObjectIntHashMap;
import nu.marginalia.model.EdgeDomain;
public class LoaderData {
private final TObjectIntHashMap<EdgeDomain> domainIds;
private EdgeDomain targetDomain;
public final int sizeHint;
private int targetDomainId = -1;
public LoaderData(int sizeHint) {
domainIds = new TObjectIntHashMap<>(10);
this.sizeHint = sizeHint;
}
public void setTargetDomain(EdgeDomain domain) {
this.targetDomain = domain;
}
public EdgeDomain getTargetDomain() {
return targetDomain;
}
public int getTargetDomainId() {
if (targetDomainId < 0)
targetDomainId = domainIds.get(targetDomain);
return targetDomainId;
}
public void addDomain(EdgeDomain domain, int id) {
domainIds.put(domain, id);
}
public int getDomainId(EdgeDomain domain) {
return domainIds.get(domain);
}
}

View File

@ -1,31 +0,0 @@
package nu.marginalia.loading.loader;
import com.google.inject.Inject;
public class LoaderFactory {
private final SqlLoadDomains sqlLoadDomains;
private final SqlLoadDomainLinks sqlLoadDomainLinks;
private final SqlLoadProcessedDomain sqlLoadProcessedDomain;
private final LdbLoadProcessedDocument sqlLoadProcessedDocument;
private final SqlLoadDomainMetadata sqlLoadDomainMetadata;
private final IndexLoadKeywords indexLoadKeywords;
@Inject
public LoaderFactory(SqlLoadDomains sqlLoadDomains,
SqlLoadDomainLinks sqlLoadDomainLinks,
SqlLoadProcessedDomain sqlLoadProcessedDomain,
LdbLoadProcessedDocument sqlLoadProcessedDocument,
SqlLoadDomainMetadata sqlLoadDomainMetadata,
IndexLoadKeywords indexLoadKeywords) {
this.sqlLoadDomains = sqlLoadDomains;
this.sqlLoadDomainLinks = sqlLoadDomainLinks;
this.sqlLoadProcessedDomain = sqlLoadProcessedDomain;
this.sqlLoadProcessedDocument = sqlLoadProcessedDocument;
this.sqlLoadDomainMetadata = sqlLoadDomainMetadata;
this.indexLoadKeywords = indexLoadKeywords;
}
public Loader create(int sizeHint) {
return new Loader(sizeHint, sqlLoadDomains, sqlLoadDomainLinks, sqlLoadProcessedDomain, sqlLoadProcessedDocument, sqlLoadDomainMetadata, indexLoadKeywords);
}
}

View File

@ -1,84 +0,0 @@
package nu.marginalia.loading.loader;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.converting.instruction.instructions.DomainLink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import static java.sql.Statement.SUCCESS_NO_INFO;
public class SqlLoadDomainLinks {
private final HikariDataSource dataSource;
private static final Logger logger = LoggerFactory.getLogger(SqlLoadDomainLinks.class);
@Inject
public SqlLoadDomainLinks(HikariDataSource dataSource) {
this.dataSource = dataSource;
try (var conn = dataSource.getConnection()) {
try (var stmt = conn.createStatement()) {
stmt.execute("DROP PROCEDURE IF EXISTS INSERT_LINK");
stmt.execute("""
CREATE PROCEDURE INSERT_LINK (
IN FROM_DOMAIN VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci,
IN TO_DOMAIN VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci
)
BEGIN
INSERT IGNORE INTO EC_DOMAIN_LINK (SOURCE_DOMAIN_ID, DEST_DOMAIN_ID)
SELECT SOURCE.ID,DEST.ID
FROM EC_DOMAIN SOURCE INNER JOIN EC_DOMAIN DEST
ON SOURCE.DOMAIN_NAME=FROM_DOMAIN AND DEST.DOMAIN_NAME=TO_DOMAIN;
END
""");
}
}
catch (SQLException ex) {
throw new RuntimeException("Failed to set up loader", ex);
}
}
public void load(LoaderData data, DomainLink[] links) {
try (var connection = dataSource.getConnection();
var nukeExistingLinksForDomain =
connection.prepareStatement("""
DELETE FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=?
""");
var stmt =
connection.prepareCall("CALL INSERT_LINK(?,?)"))
{
connection.setAutoCommit(false);
nukeExistingLinksForDomain.setInt(1, data.getDomainId(links[0].from()));
nukeExistingLinksForDomain.executeUpdate();
for (DomainLink link : links) {
stmt.setString(1, link.from().toString());
stmt.setString(2, link.to().toString());
stmt.addBatch();
}
var ret = stmt.executeBatch();
for (int rv = 0; rv < links.length; rv++) {
if (ret[rv] != 1 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", links[rv], ret[rv]);
}
}
connection.commit();
connection.setAutoCommit(true);
}
catch (SQLException ex) {
logger.warn("SQL error inserting domain links", ex);
if (getClass().desiredAssertionStatus())
throw new RuntimeException(ex);
}
}
}

View File

@ -1,41 +0,0 @@
package nu.marginalia.loading.loader;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.model.EdgeDomain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import java.sql.SQLException;
public class SqlLoadDomainMetadata {
private final HikariDataSource dataSource;
private final Logger logger = LoggerFactory.getLogger(getClass());
@Inject
public SqlLoadDomainMetadata(HikariDataSource dataSource) {
this.dataSource = dataSource;
}
public void load(LoaderData data, EdgeDomain domain, int knownUrls, int goodUrls, int visitedUrls) {
int domainId = data.getDomainId(domain);
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
INSERT INTO DOMAIN_METADATA(ID,KNOWN_URLS,VISITED_URLS,GOOD_URLS) VALUES (?, ?, ?, ?)
"""
))
{
stmt.setInt(1, domainId);
stmt.setInt(2, knownUrls);
stmt.setInt(3, visitedUrls);
stmt.setInt(4, goodUrls);
stmt.executeUpdate();
} catch (SQLException ex) {
logger.warn("SQL error inserting domains", ex);
if (getClass().desiredAssertionStatus())
throw new RuntimeException(ex);
}
}
}

View File

@ -1,163 +0,0 @@
package nu.marginalia.loading.loader;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.model.EdgeDomain;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import static java.sql.Statement.SUCCESS_NO_INFO;
public class SqlLoadDomains {
private final HikariDataSource dataSource;
private static final Logger logger = LoggerFactory.getLogger(SqlLoadDomains.class);
@Inject
public SqlLoadDomains(HikariDataSource dataSource) {
this.dataSource = dataSource;
try (var conn = dataSource.getConnection()) {
try (var stmt = conn.createStatement()) {
stmt.execute("DROP PROCEDURE IF EXISTS INSERT_DOMAIN");
stmt.execute("""
CREATE PROCEDURE INSERT_DOMAIN (
IN DOMAIN_NAME VARCHAR(255),
IN TOP_DOMAIN VARCHAR(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci)
BEGIN
INSERT IGNORE INTO EC_DOMAIN(DOMAIN_NAME, DOMAIN_TOP) VALUES (DOMAIN_NAME, TOP_DOMAIN);
END
""");
}
}
catch (SQLException ex) {
throw new RuntimeException("Failed to set up loader", ex);
}
}
public void load(LoaderData data, EdgeDomain domain) {
try (var connection = dataSource.getConnection()) {
try (var insertCall = connection.prepareCall("CALL INSERT_DOMAIN(?,?)")) {
connection.setAutoCommit(false);
insertCall.setString(1, domain.toString());
insertCall.setString(2, domain.domain);
var ret = insertCall.executeUpdate();
connection.commit();
if (ret < 0) {
logger.warn("load({}) -- bad return status {}", domain, ret);
}
findIdForDomain(connection, data, domain);
connection.setAutoCommit(true);
}
}
catch (SQLException ex) {
logger.warn("SQL error inserting domain", ex);
if (getClass().desiredAssertionStatus())
throw new RuntimeException(ex);
}
}
public void load(LoaderData data, EdgeDomain[] domains) {
try (var connection = dataSource.getConnection()) {
connection.setAutoCommit(false);
try (var insertCall = connection.prepareCall("CALL INSERT_DOMAIN(?,?)")) {
int cnt = 0; int batchOffset = 0;
for (var domain : domains) {
insertCall.setString(1, domain.toString());
insertCall.setString(2, domain.domain);
insertCall.addBatch();
if (++cnt == 1000) {
var ret = insertCall.executeBatch();
connection.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", domains[batchOffset + rv], ret[rv]);
}
}
cnt = 0;
batchOffset += 1000;
}
}
if (cnt > 0) {
var ret = insertCall.executeBatch();
connection.commit();
for (int rv = 0; rv < cnt; rv++) {
if (ret[rv] < 0 && ret[rv] != SUCCESS_NO_INFO) {
logger.warn("load({}) -- bad row count {}", domains[batchOffset + rv], ret[rv]);
}
}
}
}
connection.commit();
connection.setAutoCommit(true);
findIdForDomain(connection, data, domains);
}
catch (SQLException ex) {
logger.warn("SQL error inserting domains", ex);
}
}
void findIdForDomain(Connection connection, LoaderData data, EdgeDomain... domains) {
if (data.getTargetDomain() == null || data.getDomainId(data.getTargetDomain()) > 0) {
return;
}
try (var query = connection.prepareStatement("SELECT ID FROM EC_DOMAIN WHERE DOMAIN_NAME=?"))
{
for (var domain : domains) {
if (data.getDomainId(domain) > 0)
continue;
query.setString(1, domain.toString());
var rsp = query.executeQuery();
if (rsp.next()) {
data.addDomain(domain, rsp.getInt(1));
} else {
logger.warn("load() -- could not find ID for target domain {}", domain);
}
}
}
catch (SQLException ex) {
logger.warn("SQL error finding id for domain", ex);
}
}
void loadAdditionalDomains(Connection connection, LoaderData data, EdgeDomain[] domains) {
try (var query = connection.prepareStatement("SELECT ID FROM EC_DOMAIN WHERE DOMAIN_NAME=?"))
{
for (var domain : domains) {
if (data.getDomainId(domain) == 0) continue;
query.setString(1, domain.toString());
var rsp = query.executeQuery();
if (rsp.next()) {
data.addDomain(domain, rsp.getInt(1));
} else {
logger.warn("load() -- could not find ID for target domain {}", domain);
}
}
}
catch (SQLException ex) {
logger.warn("SQL error finding id for domain", ex);
}
}
}

View File

@ -1,104 +0,0 @@
package nu.marginalia.loading.loader;
import com.google.inject.Inject;
import com.zaxxer.hikari.HikariDataSource;
import nu.marginalia.model.crawl.DomainIndexingState;
import nu.marginalia.converting.instruction.instructions.DomainLink;
import nu.marginalia.model.EdgeDomain;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
public class SqlLoadProcessedDomain {
private final HikariDataSource dataSource;
private final SqlLoadDomains loadDomains;
private static final Logger logger = LoggerFactory.getLogger(SqlLoadProcessedDomain.class);
@Inject
public SqlLoadProcessedDomain(HikariDataSource dataSource, SqlLoadDomains loadDomains) {
this.dataSource = dataSource;
this.loadDomains = loadDomains;
try (var conn = dataSource.getConnection()) {
try (var stmt = conn.createStatement()) {
stmt.execute("DROP PROCEDURE IF EXISTS INITIALIZE_DOMAIN");
stmt.execute("""
CREATE PROCEDURE INITIALIZE_DOMAIN (
IN ST ENUM('ACTIVE', 'EXHAUSTED', 'SPECIAL', 'SOCIAL_MEDIA', 'BLOCKED', 'REDIR', 'ERROR', 'UNKNOWN'),
IN IDX INT,
IN DID INT,
IN IP VARCHAR(48))
BEGIN
DELETE FROM DOMAIN_METADATA WHERE ID=DID;
DELETE FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=DID;
UPDATE EC_DOMAIN SET INDEX_DATE=NOW(), STATE=ST, DOMAIN_ALIAS=NULL, INDEXED=GREATEST(INDEXED,IDX), IP=IP WHERE ID=DID;
DELETE FROM EC_DOMAIN_LINK WHERE SOURCE_DOMAIN_ID=DID;
END
""");
}
}
catch (SQLException ex) {
throw new RuntimeException("Failed to set up loader", ex);
}
}
public void load(LoaderData data, EdgeDomain domain, DomainIndexingState state, String ip) {
data.setTargetDomain(domain);
loadDomains.load(data, domain);
try (var conn = dataSource.getConnection()) {
try (var initCall = conn.prepareCall("CALL INITIALIZE_DOMAIN(?,?,?,?)")) {
initCall.setString(1, state.name());
initCall.setInt(2, 1 + data.sizeHint / 100);
initCall.setInt(3, data.getDomainId(domain));
initCall.setString(4, StringUtils.truncate(ip, 48));
int rc = initCall.executeUpdate();
conn.commit();
if (rc < 1) {
logger.warn("load({},{}) -- bad rowcount {}", domain, state, rc);
}
}
catch (SQLException ex) {
conn.rollback();
throw ex;
}
}
catch (SQLException ex) {
logger.warn("SQL error initializing domain", ex);
if (getClass().desiredAssertionStatus())
throw new RuntimeException(ex);
}
}
public void loadAlias(LoaderData data, DomainLink link) {
try (var conn = dataSource.getConnection();
var stmt = conn.prepareStatement("""
UPDATE EC_DOMAIN TARGET
INNER JOIN EC_DOMAIN ALIAS ON ALIAS.DOMAIN_NAME=?
SET TARGET.DOMAIN_ALIAS=ALIAS.ID
WHERE TARGET.DOMAIN_NAME=?
""")) {
stmt.setString(1, link.to().toString());
stmt.setString(2, link.from().toString());
int rc = stmt.executeUpdate();
conn.commit();
if (rc != 1) {
logger.warn("loadAlias({}) - unexpected row count {}", link, rc);
}
}
catch (SQLException ex) {
logger.warn("SQL error inserting domain alias", ex);
if (getClass().desiredAssertionStatus())
throw new RuntimeException(ex);
}
}
}

Some files were not shown because too many files have changed in this diff Show More