Restructuring the git repo

This commit is contained in:
Viktor Lofgren 2023-03-04 13:19:01 +01:00
parent 9c665bbc74
commit 4fdaaa16ba
1234 changed files with 11606 additions and 14306 deletions

3
.gitignore vendored
View File

@ -4,3 +4,6 @@ build/
*~ *~
.gradle/ .gradle/
.idea/ .idea/
lombok.config
Dockerfile
run

View File

@ -1,31 +1,38 @@
# marginalia.nu # marginalia.nu
This is the source code for marginalia.nu, including the [search engine](https://search.marginalia.nu), This is the source code for [Marginalia Search](https://search.marginalia.nu).
the [MEMEX/gemini server](https://memex.marginalia.nu), the and the [encyclopedia service](https://encyclopedia.marginalia.nu).
The aim of the project is to develop new and alternative discovery methods for the Internet. The aim of the project is to develop new and alternative discovery methods for the Internet.
It's an experimental workshop as much as it is a public service, the overarching goal is to It's an experimental workshop as much as it is a public service, the overarching goal is to
elevate the more human, non-commercial sides of the Internet. A side-goal is to do this without elevate the more human, non-commercial sides of the Internet. A side-goal is to do this without
requiring datacenters and expensive enterprise hardware, to run this operation on affordable hardware. requiring datacenters and expensive enterprise hardware, to run this operation on affordable hardware.
The canonical git server for this project is [https://git.marginalia.nu](https://git.marginalia.nu). ## Set up instructions
It is fine to mirror it on other hosts, but if you have issues or questions
git.marginalia.nu is where you want to go.
## Important note about wmsa.local For local development, you're strongly encouraged to use docker or podman.
From a fresh to running system, you'll need to do this:
This project has a [sister repository called wmsa.local](https://git.marginalia.nu/marginalia/wmsa.local) ```
that contains scripts and configuration files for running and developing the code. $ ./gradlew assemble
Without it, development is very unpleasant. $ ./gradlew docker
While developing the code, you will want an environment variable WMSA_HOME pointing to $ vim run/settings.profile
the directory in which wmsa.local is checked out, otherwise the code will not run and
several tests will fail. (follow instructions in file)
$ run/setup.sh
$ run/reconvert.sh
$ docker-compose up
```
Wait a moment and check out [https://localhost:8080](https://localhost:8080).
## Documentation ## Documentation
Documentation is a work in progress. See the [wiki](https://git.marginalia.nu/marginalia/marginalia.nu/wiki). Documentation is a work in progress.
## Contributing ## Contributing

View File

@ -0,0 +1,45 @@
plugins {
id 'java'
id "io.freefair.lombok" version "5.3.3.3"
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
}
}
dependencies {
implementation project(':third-party')
implementation project(':protocol')
implementation project(':common:model')
implementation project(':common:config')
implementation project(':common:service-discovery')
implementation project(':common:service-client')
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.bundles.slf4j
implementation libs.prometheus
implementation libs.notnull
implementation libs.guice
implementation libs.rxjava
implementation libs.gson
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
test {
useJUnitPlatform()
}
task fastTests(type: Test) {
useJUnitPlatform {
excludeTags "slow"
}
}

View File

@ -1,28 +1,32 @@
package nu.marginalia.wmsa.edge.assistant.client; package nu.marginalia.assistant.client;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Observable;
import nu.marginalia.wmsa.client.AbstractDynamicClient; import nu.marginalia.assistant.client.model.DictionaryResponse;
import nu.marginalia.wmsa.client.exception.RouteNotConfiguredException; import nu.marginalia.client.AbstractDynamicClient;
import nu.marginalia.wmsa.configuration.ServiceDescriptor; import nu.marginalia.client.exception.RouteNotConfiguredException;
import nu.marginalia.wmsa.configuration.server.Context; import nu.marginalia.WmsaHome;
import nu.marginalia.wmsa.edge.assistant.dict.DictionaryResponse; import nu.marginalia.model.gson.GsonFactory;
import org.eclipse.jetty.util.UrlEncoded; import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.client.Context;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.List; import java.util.List;
@Singleton @Singleton
public class AssistantClient extends AbstractDynamicClient { public class AssistantClient extends AbstractDynamicClient {
@Inject @Inject
public AssistantClient() { public AssistantClient(ServiceDescriptors descriptors) {
super(ServiceDescriptor.EDGE_ASSISTANT); super(descriptors.forId(ServiceId.Assistant), WmsaHome.getHostsFile(), GsonFactory::get);
} }
public Observable<DictionaryResponse> dictionaryLookup(Context ctx, String word) { public Observable<DictionaryResponse> dictionaryLookup(Context ctx, String word) {
try { try {
return super.get(ctx, "/dictionary/" + UrlEncoded.encodeString(word), DictionaryResponse.class); return super.get(ctx, "/dictionary/" + URLEncoder.encode(word, StandardCharsets.UTF_8), DictionaryResponse.class);
} }
catch (RouteNotConfiguredException ex) { catch (RouteNotConfiguredException ex) {
return Observable.empty(); return Observable.empty();
@ -32,7 +36,7 @@ public class AssistantClient extends AbstractDynamicClient {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public Observable<List<String>> spellCheck(Context ctx, String word) { public Observable<List<String>> spellCheck(Context ctx, String word) {
try { try {
return (Observable<List<String>>) (Object) super.get(ctx, "/spell-check/" + UrlEncoded.encodeString(word), List.class); return (Observable<List<String>>) (Object) super.get(ctx, "/spell-check/" + URLEncoder.encode(word, StandardCharsets.UTF_8), List.class);
} }
catch (RouteNotConfiguredException ex) { catch (RouteNotConfiguredException ex) {
return Observable.empty(); return Observable.empty();
@ -49,7 +53,7 @@ public class AssistantClient extends AbstractDynamicClient {
public Observable<String> evalMath(Context ctx, String expression) { public Observable<String> evalMath(Context ctx, String expression) {
try { try {
return super.get(ctx, "/eval-expression?value=" + UrlEncoded.encodeString(expression)); return super.get(ctx, "/eval-expression?value=" + URLEncoder.encode(expression, StandardCharsets.UTF_8));
} }
catch (RouteNotConfiguredException ex) { catch (RouteNotConfiguredException ex) {
return Observable.empty(); return Observable.empty();

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.assistant.dict; package nu.marginalia.assistant.client.model;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.assistant.dict; package nu.marginalia.assistant.client.model;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;

View File

@ -0,0 +1,48 @@
plugins {
id 'java'
id "io.freefair.lombok" version "5.3.3.3"
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
}
}
dependencies {
implementation project(':third-party')
implementation project(':protocol')
implementation project(':common:model')
implementation project(':common:config')
implementation project(':common:service-discovery')
implementation project(':common:service-client')
implementation project(':index:index-query')
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.bundles.slf4j
implementation libs.prometheus
implementation libs.notnull
implementation libs.guice
implementation libs.rxjava
implementation libs.protobuf
implementation libs.gson
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
test {
useJUnitPlatform()
}
task fastTests(type: Test) {
useJUnitPlatform {
excludeTags "slow"
}
}

View File

@ -0,0 +1,45 @@
package nu.marginalia.index.client;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.prometheus.client.Summary;
import io.reactivex.rxjava3.core.Observable;
import nu.marginalia.WmsaHome;
import nu.marginalia.client.AbstractDynamicClient;
import nu.marginalia.client.Context;
import nu.marginalia.index.client.model.query.EdgeSearchSpecification;
import nu.marginalia.index.client.model.results.EdgeSearchResultItem;
import nu.marginalia.index.client.model.results.EdgeSearchResultSet;
import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import javax.annotation.CheckReturnValue;
import java.util.List;
@Singleton
public class EdgeIndexClient extends AbstractDynamicClient {
private static final Summary wmsa_search_index_api_time = Summary.build().name("wmsa_search_index_api_time").help("-").register();
@Inject
public EdgeIndexClient(ServiceDescriptors descriptors) {
super(descriptors.forId(ServiceId.Index), WmsaHome.getHostsFile(), GsonFactory::get);
setTimeout(30);
}
@CheckReturnValue
public List<EdgeSearchResultItem> query(Context ctx, EdgeSearchSpecification specs) {
return wmsa_search_index_api_time.time(
() -> this.postGet(ctx, "/search/", specs, EdgeSearchResultSet.class).blockingFirst().getResults()
);
}
@CheckReturnValue
public Observable<Boolean> isBlocked(Context ctx) {
return super.get(ctx, "/is-blocked", Boolean.class);
}
}

View File

@ -1,10 +1,10 @@
package nu.marginalia.wmsa.edge.model.search.domain; package nu.marginalia.index.client.model.domain;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import lombok.ToString; import lombok.ToString;
import nu.marginalia.wmsa.edge.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.wmsa.edge.model.id.EdgeIdList; import nu.marginalia.model.id.EdgeIdList;
@AllArgsConstructor @Getter @ToString @AllArgsConstructor @Getter @ToString
public class EdgeDomainSearchResults { public class EdgeDomainSearchResults {

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.model.search.domain; package nu.marginalia.index.client.model.domain;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.ToString; import lombok.ToString;

View File

@ -1,10 +1,9 @@
package nu.marginalia.wmsa.edge.model.search; package nu.marginalia.index.client.model.query;
import lombok.*; import lombok.*;
import nu.marginalia.wmsa.edge.index.model.QueryLimits; import nu.marginalia.index.query.limit.QueryLimits;
import nu.marginalia.wmsa.edge.index.model.QueryStrategy; import nu.marginalia.index.query.limit.QueryStrategy;
import nu.marginalia.wmsa.edge.index.svc.searchset.SearchSetIdentifier; import nu.marginalia.index.query.limit.SpecificationLimit;
import nu.marginalia.wmsa.edge.model.search.domain.SpecificationLimit;
import java.util.List; import java.util.List;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.model.search; package nu.marginalia.index.client.model.query;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;

View File

@ -1,6 +1,4 @@
package nu.marginalia.wmsa.edge.index.svc.searchset; package nu.marginalia.index.client.model.query;
import nu.marginalia.wmsa.edge.search.model.EdgeSearchProfile;
/** Identifies a RankingSearchSet, associated with an EdgeSearchProfile /** Identifies a RankingSearchSet, associated with an EdgeSearchProfile
* *

View File

@ -1,9 +1,9 @@
package nu.marginalia.wmsa.edge.model.search; package nu.marginalia.index.client.model.results;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;
import nu.marginalia.wmsa.edge.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.wmsa.edge.model.id.EdgeId; import nu.marginalia.model.id.EdgeId;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -41,7 +41,7 @@ public class EdgeSearchResultItem {
return scoreValue; return scoreValue;
} }
private transient int domainId = 0; private transient int domainId = Integer.MIN_VALUE;
public void setDomainId(int domainId) { public void setDomainId(int domainId) {
this.domainId = domainId; this.domainId = domainId;
} }
@ -69,12 +69,12 @@ public class EdgeSearchResultItem {
} }
public long deduplicationKey() { public long deduplicationKey() {
final int ranking = getDomainId(); final int domainId = getDomainId();
if (ranking == Integer.MAX_VALUE || ranking == Integer.MIN_VALUE) { if (domainId == Integer.MAX_VALUE || domainId == Integer.MIN_VALUE) {
return 0; return 0;
} }
return ranking; return domainId;
} }
} }

View File

@ -1,9 +1,9 @@
package nu.marginalia.wmsa.edge.model.search; package nu.marginalia.index.client.model.results;
import nu.marginalia.wmsa.edge.index.model.EdgePageDocumentFlags; import nu.marginalia.model.crawl.EdgePageWordFlags;
import nu.marginalia.wmsa.edge.index.model.EdgePageDocumentsMetadata; import nu.marginalia.model.idx.EdgePageWordMetadata;
import nu.marginalia.wmsa.edge.index.model.EdgePageWordFlags; import nu.marginalia.model.crawl.EdgePageDocumentFlags;
import nu.marginalia.wmsa.edge.index.model.EdgePageWordMetadata; import nu.marginalia.model.idx.EdgePageDocumentsMetadata;
import static java.lang.Integer.lowestOneBit; import static java.lang.Integer.lowestOneBit;
import static java.lang.Integer.numberOfTrailingZeros; import static java.lang.Integer.numberOfTrailingZeros;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.model.search; package nu.marginalia.index.client.model.results;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.model.search; package nu.marginalia.index.client.model.results;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;

7
api/readme.md Normal file
View File

@ -0,0 +1,7 @@
# Core Service Clients
These are clients for the [core services](../services-core/), along with what models
are necessary for speaking to them.
All that is necessary is to `@Inject` them into the constructor and then
requests can be sent.

View File

@ -0,0 +1,46 @@
plugins {
id 'java'
id "io.freefair.lombok" version "5.3.3.3"
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
}
}
dependencies {
implementation project(':third-party')
implementation project(':protocol')
implementation project(':common:model')
implementation project(':common:config')
implementation project(':common:service-discovery')
implementation project(':common:service-client')
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.bundles.slf4j
implementation libs.prometheus
implementation libs.notnull
implementation libs.guice
implementation libs.rxjava
implementation libs.gson
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
test {
useJUnitPlatform()
}
task fastTests(type: Test) {
useJUnitPlatform {
excludeTags "slow"
}
}

View File

@ -1,11 +1,15 @@
package nu.marginalia.wmsa.edge.search.client; package nu.marginalia.search.client;
import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Observable;
import nu.marginalia.wmsa.api.model.ApiSearchResults; import nu.marginalia.client.AbstractDynamicClient;
import nu.marginalia.wmsa.client.AbstractDynamicClient; import nu.marginalia.model.gson.GsonFactory;
import nu.marginalia.wmsa.configuration.ServiceDescriptor; import nu.marginalia.search.client.model.ApiSearchResults;
import nu.marginalia.wmsa.configuration.server.Context; import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.WmsaHome;
import nu.marginalia.client.Context;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -17,8 +21,9 @@ import java.nio.charset.StandardCharsets;
public class EdgeSearchClient extends AbstractDynamicClient { public class EdgeSearchClient extends AbstractDynamicClient {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
public EdgeSearchClient() { @Inject
super(ServiceDescriptor.EDGE_SEARCH); public EdgeSearchClient(ServiceDescriptors descriptors) {
super(descriptors.forId(ServiceId.Search), WmsaHome.getHostsFile(), GsonFactory::get);
} }
@CheckReturnValue @CheckReturnValue

View File

@ -0,0 +1,18 @@
package nu.marginalia.search.client.model;
import lombok.AllArgsConstructor;
import lombok.Getter;
import java.util.ArrayList;
import java.util.List;
@AllArgsConstructor @Getter
public class ApiSearchResult {
public String url;
public String title;
public String description;
public double quality;
public List<List<ApiSearchResultQueryDetails>> details = new ArrayList<>();
}

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.api.model; package nu.marginalia.search.client.model;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.api.model; package nu.marginalia.search.client.model;
import lombok.AllArgsConstructor; import lombok.AllArgsConstructor;
import lombok.Getter; import lombok.Getter;

View File

@ -1,42 +1,16 @@
plugins { plugins {
id 'java' id 'java'
id 'com.github.johnrengelman.shadow' version '6.0.0'
} }
group 'nu.marginalia' group 'marginalia'
version 'SNAPSHOT' version 'SNAPSHOT'
compileJava.options.encoding = "UTF-8" compileJava.options.encoding = "UTF-8"
compileTestJava.options.encoding = "UTF-8" compileTestJava.options.encoding = "UTF-8"
repositories {
mavenLocal()
maven { url "https://artifactory.cronapp.io/public-release/" }
maven { url "https://repo1.maven.org/maven2/" }
maven { url "https://www2.ph.ed.ac.uk/maven2/" }
maven { url "https://jitpack.io/" }
exclusiveContent {
forRepository {
maven {
url = uri("https://jitpack.io")
}
}
filter {
// Only use JitPack for the `gson-record-type-adapter-factory` library
includeModule("com.github.Marcono1234", "gson-record-type-adapter-factory")
}
}
}
shadowJar { task dist(type: Copy) {
zip64 true from subprojects.collect { it.tasks.withType(Tar) }
} into "$buildDir/dist"
jar {
manifest {
attributes 'Main-Class': "nu.marginalia.wmsa.configuration.ServiceDescriptor"
}
from {
configurations.shadow.collect { it.isDirectory() ? it : zipTree(it) }
}
} }
java { java {
@ -44,19 +18,3 @@ java {
languageVersion.set(JavaLanguageVersion.of(17)) languageVersion.set(JavaLanguageVersion.of(17))
} }
} }
dependencies {
implementation project(':marginalia_nu')
}
task version() { //
}
test {
maxParallelForks = 16
forkEvery = 1
maxHeapSize = "8G"
useJUnitPlatform {
excludeTags "nobuild"
}
}

View File

@ -0,0 +1,32 @@
plugins {
id 'java'
id "io.freefair.lombok" version "5.3.3.3"
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
}
}
dependencies {
implementation project(':common:service-discovery')
implementation project(':common:service-client')
implementation project(':libraries:misc')
}
test {
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
maxHeapSize = "8G"
useJUnitPlatform()
}
task fastTests(type: Test) {
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
maxHeapSize = "8G"
useJUnitPlatform {
excludeTags "slow"
}
}

View File

@ -0,0 +1,22 @@
package nu.marginalia;
import java.nio.file.Path;
public class LanguageModels {
public final Path ngramBloomFilter;
public final Path termFrequencies;
public final Path openNLPSentenceDetectionData;
public final Path posRules;
public final Path posDict;
public final Path openNLPTokenData;
public LanguageModels(Path ngramBloomFilter, Path termFrequencies, Path openNLPSentenceDetectionData, Path posRules, Path posDict, Path openNLPTokenData) {
this.ngramBloomFilter = ngramBloomFilter;
this.termFrequencies = termFrequencies;
this.openNLPSentenceDetectionData = openNLPSentenceDetectionData;
this.posRules = posRules;
this.posDict = posDict;
this.openNLPTokenData = openNLPTokenData;
}
}

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.configuration; package nu.marginalia;
public record UserAgent(String uaString) { public record UserAgent(String uaString) {

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.configuration; package nu.marginalia;
public record WebsiteUrl(String url) { public record WebsiteUrl(String url) {
public String withPath(String path) { public String withPath(String path) {

View File

@ -1,6 +1,7 @@
package nu.marginalia.wmsa.configuration; package nu.marginalia;
import nu.marginalia.util.language.conf.LanguageModels;
import nu.marginalia.service.descriptor.HostsFile;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;

53
common/model/build.gradle Normal file
View File

@ -0,0 +1,53 @@
plugins {
id 'java'
id "io.freefair.lombok" version "5.3.3.3"
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
}
}
dependencies {
implementation project(':common:service-discovery')
implementation project(':common:service-client')
implementation project(':libraries:misc')
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.bundles.slf4j
implementation libs.guice
implementation libs.bundles.gson
implementation libs.notnull
implementation libs.commons.lang3
implementation libs.trove
implementation libs.fastutil
implementation libs.rxjava
implementation libs.bundles.mariadb
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
test {
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
maxHeapSize = "8G"
useJUnitPlatform()
}
task fastTests(type: Test) {
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
maxHeapSize = "8G"
useJUnitPlatform {
excludeTags "slow"
}
}

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.model; package nu.marginalia.model;
import lombok.*; import lombok.*;
@ -10,8 +10,6 @@ import java.util.regex.Pattern;
@AllArgsConstructor @AllArgsConstructor
@Getter @Setter @Builder @Getter @Setter @Builder
public class EdgeDomain { public class EdgeDomain {
@Nonnull @Nonnull
public final String subDomain; public final String subDomain;
@Nonnull @Nonnull
@ -109,6 +107,7 @@ public class EdgeDomain {
} }
return domain.substring(0, cutPoint).toLowerCase(); return domain.substring(0, cutPoint).toLowerCase();
} }
public String getLongDomainKey() { public String getLongDomainKey() {
StringBuilder ret = new StringBuilder(); StringBuilder ret = new StringBuilder();

View File

@ -1,9 +1,9 @@
package nu.marginalia.wmsa.edge.model; package nu.marginalia.model;
import lombok.Builder; import lombok.Builder;
import lombok.Getter; import lombok.Getter;
import lombok.Setter; import lombok.Setter;
import nu.marginalia.wmsa.edge.converting.processor.logic.QueryParams; import nu.marginalia.util.QueryParams;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
@ -12,7 +12,7 @@ import java.util.Optional;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@Getter @Setter @Builder @Getter @Setter @Builder
public class EdgeUrl implements WideHashable { public class EdgeUrl {
public final String proto; public final String proto;
public final EdgeDomain domain; public final EdgeDomain domain;
public final Integer port; public final Integer port;
@ -158,12 +158,6 @@ public class EdgeUrl implements WideHashable {
return path.replaceAll(".*/", ""); return path.replaceAll(".*/", "");
} }
public long wideHash() {
long domainHash = domain.hashCode();
long thisHash = hashCode();
return (domainHash << 32) | thisHash;
}
public int depth() { public int depth() {
return (int) path.chars().filter(c -> c=='/').count(); return (int) path.chars().filter(c -> c=='/').count();
} }

View File

@ -1,7 +1,7 @@
package nu.marginalia.wmsa.edge.converting.interpreter.instruction; package nu.marginalia.model.crawl;
import nu.marginalia.wmsa.edge.index.model.EdgePageWordMetadata;
import nu.marginalia.wmsa.edge.model.crawl.EdgePageWords; import nu.marginalia.model.idx.EdgePageWordMetadata;
import java.util.Arrays; import java.util.Arrays;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.model.crawl; package nu.marginalia.model.crawl;
import lombok.*; import lombok.*;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.model.crawl; package nu.marginalia.model.crawl;
public enum EdgeDomainIndexingState { public enum EdgeDomainIndexingState {
ACTIVE("Active"), ACTIVE("Active"),

View File

@ -1,7 +1,7 @@
package nu.marginalia.wmsa.edge.model.crawl; package nu.marginalia.model.crawl;
import lombok.*; import lombok.*;
import nu.marginalia.wmsa.edge.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
@AllArgsConstructor @EqualsAndHashCode @Getter @Setter @Builder @ToString @AllArgsConstructor @EqualsAndHashCode @Getter @Setter @Builder @ToString
public class EdgeDomainLink { public class EdgeDomainLink {

View File

@ -1,4 +1,5 @@
package nu.marginalia.wmsa.edge.model.crawl; package nu.marginalia.model.crawl;
public enum EdgeHtmlStandard { public enum EdgeHtmlStandard {
PLAIN(0, 1, 1993), PLAIN(0, 1, 1993),
@ -8,9 +9,13 @@ public enum EdgeHtmlStandard {
XHTML(-0.1, 1.05, 2006), XHTML(-0.1, 1.05, 2006),
HTML5(0.5, 1.1, 2018); HTML5(0.5, 1.1, 2018);
/** Used to tune quality score */
public final double offset; public final double offset;
/** Used to tune quality score */
public final double scale; public final double scale;
/** This parameter is used to bias publish date heuristics
* */
public final int yearGuess; public final int yearGuess;
EdgeHtmlStandard(double offset, double scale, int yearGuess) { EdgeHtmlStandard(double offset, double scale, int yearGuess) {

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.index.model; package nu.marginalia.model.crawl;
import java.util.EnumSet; import java.util.EnumSet;
@ -6,7 +6,7 @@ public enum EdgePageDocumentFlags {
/** Simple processing was done, this document should be de-prioritized as a search result */ /** Simple processing was done, this document should be de-prioritized as a search result */
Simple, Simple,
UnusedBit1, PlainText,
UnusedBit2, UnusedBit2,
UnusedBit3, UnusedBit3,
UnusedBit4, UnusedBit4,

View File

@ -1,9 +1,5 @@
package nu.marginalia.wmsa.edge.index.model; package nu.marginalia.model.crawl;
import nu.marginalia.util.language.processing.KeywordCounter;
import nu.marginalia.util.language.processing.NameCounter;
import nu.marginalia.util.language.processing.SubjectCounter;
import nu.marginalia.wmsa.edge.converting.processor.SiteWords;
import java.util.EnumSet; import java.util.EnumSet;

View File

@ -1,10 +1,8 @@
package nu.marginalia.wmsa.edge.model.crawl; package nu.marginalia.model.crawl;
import gnu.trove.list.array.TLongArrayList; import gnu.trove.list.array.TLongArrayList;
import lombok.Getter; import lombok.Getter;
import lombok.ToString; import lombok.ToString;
import nu.marginalia.wmsa.edge.index.model.EdgePageWordFlags;
import nu.marginalia.wmsa.edge.index.model.EdgePageWordMetadata;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -39,7 +37,7 @@ public class EdgePageWords {
List<Long> emptyMeta = new ArrayList<>(entries.size()); List<Long> emptyMeta = new ArrayList<>(entries.size());
for (int i = 0; i < entries.size(); i++) { for (int i = 0; i < entries.size(); i++) {
emptyMeta.add(EdgePageWordMetadata.emptyValue()); emptyMeta.add(0L);
} }
return new EdgePageWords(entries, emptyMeta); return new EdgePageWords(entries, emptyMeta);

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.model.crawl; package nu.marginalia.model.crawl;
/** This should correspond to EC_URL.STATE */ /** This should correspond to EC_URL.STATE */
public enum EdgeUrlState { public enum EdgeUrlState {

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.converting.processor.logic; package nu.marginalia.model.crawl;
import java.util.Collection; import java.util.Collection;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.converting.processor.logic.pubdate; package nu.marginalia.model.crawl;
import java.time.LocalDate; import java.time.LocalDate;
import java.time.format.DateTimeFormatter; import java.time.format.DateTimeFormatter;

View File

@ -0,0 +1,64 @@
package nu.marginalia.model.dbcommon;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows;
import nu.marginalia.model.EdgeDomain;
import nu.marginalia.model.id.EdgeId;
import java.util.NoSuchElementException;
import java.util.Optional;
@Singleton
public class DbDomainQueries {
private final HikariDataSource dataSource;
private final Cache<EdgeDomain, EdgeId<EdgeDomain>> domainIdCache = CacheBuilder.newBuilder().maximumSize(10_000).build();
@Inject
public DbDomainQueries(HikariDataSource dataSource)
{
this.dataSource = dataSource;
}
@SneakyThrows
public EdgeId<EdgeDomain> getDomainId(EdgeDomain domain) {
try (var connection = dataSource.getConnection()) {
return domainIdCache.get(domain, () -> {
try (var stmt = connection.prepareStatement("SELECT ID FROM EC_DOMAIN WHERE DOMAIN_NAME=?")) {
stmt.setString(1, domain.toString());
var rsp = stmt.executeQuery();
if (rsp.next()) {
return new EdgeId<>(rsp.getInt(1));
}
}
throw new NoSuchElementException();
});
}
catch (UncheckedExecutionException ex) {
throw ex.getCause();
}
}
@SneakyThrows
public Optional<EdgeDomain> getDomain(EdgeId<EdgeDomain> id) {
try (var connection = dataSource.getConnection()) {
try (var stmt = connection.prepareStatement("SELECT DOMAIN_NAME FROM EC_DOMAIN WHERE ID=?")) {
stmt.setInt(1, id.id());
var rsp = stmt.executeQuery();
if (rsp.next()) {
return Optional.of(new EdgeDomain(rsp.getString(1)));
}
return Optional.empty();
}
}
}
}

View File

@ -1,9 +1,9 @@
package nu.marginalia.wmsa.edge.dbcommon; package nu.marginalia.model.dbcommon;
import com.google.inject.ImplementedBy; import com.google.inject.ImplementedBy;
import gnu.trove.set.hash.TIntHashSet; import gnu.trove.set.hash.TIntHashSet;
import nu.marginalia.wmsa.edge.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.wmsa.edge.model.id.EdgeId; import nu.marginalia.model.id.EdgeId;
@ImplementedBy(EdgeDomainBlacklistImpl.class) @ImplementedBy(EdgeDomainBlacklistImpl.class)
public interface EdgeDomainBlacklist { public interface EdgeDomainBlacklist {

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.dbcommon; package nu.marginalia.model.dbcommon;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
@ -6,6 +6,7 @@ import com.zaxxer.hikari.HikariDataSource;
import gnu.trove.set.hash.TIntHashSet; import gnu.trove.set.hash.TIntHashSet;
import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.schedulers.Schedulers;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import nu.marginalia.model.EdgeDomain;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -67,5 +68,4 @@ public class EdgeDomainBlacklistImpl implements EdgeDomainBlacklist {
return false; return false;
} }
} }

View File

@ -1,11 +1,11 @@
package nu.marginalia.wmsa.client; package nu.marginalia.model.gson;
import com.google.gson.*; import com.google.gson.*;
import marcono1234.gson.recordadapter.RecordTypeAdapterFactory; import marcono1234.gson.recordadapter.RecordTypeAdapterFactory;
import nu.marginalia.util.bigstring.BigString; import nu.marginalia.bigstring.BigString;
import nu.marginalia.wmsa.edge.model.EdgeDomain; import nu.marginalia.model.EdgeDomain;
import nu.marginalia.wmsa.edge.model.EdgeUrl; import nu.marginalia.model.EdgeUrl;
import nu.marginalia.wmsa.edge.model.id.EdgeId; import nu.marginalia.model.id.EdgeId;
import java.net.URISyntaxException; import java.net.URISyntaxException;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.model.id; package nu.marginalia.model.id;
/** /**

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.model.id; package nu.marginalia.model.id;
import java.util.Arrays; import java.util.Arrays;
import java.util.stream.IntStream; import java.util.stream.IntStream;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.model.id; package nu.marginalia.model.id;
import java.util.Arrays; import java.util.Arrays;
import java.util.stream.IntStream; import java.util.stream.IntStream;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.model.id; package nu.marginalia.model.id;
import gnu.trove.TIntCollection; import gnu.trove.TIntCollection;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.model.id; package nu.marginalia.model.id;
import gnu.trove.TIntCollection; import gnu.trove.TIntCollection;
import gnu.trove.list.array.TIntArrayList; import gnu.trove.list.array.TIntArrayList;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.model.id; package nu.marginalia.model.id;
import gnu.trove.TIntCollection; import gnu.trove.TIntCollection;
import gnu.trove.set.hash.TIntHashSet; import gnu.trove.set.hash.TIntHashSet;

View File

@ -1,6 +1,7 @@
package nu.marginalia.wmsa.edge.index.model; package nu.marginalia.model.idx;
import nu.marginalia.wmsa.edge.converting.processor.logic.pubdate.PubDate; import nu.marginalia.model.crawl.EdgePageDocumentFlags;
import nu.marginalia.model.crawl.PubDate;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.Set; import java.util.Set;
@ -107,7 +108,6 @@ public record EdgePageDocumentsMetadata(int rank,
return (int) ((encoded >>> TOPOLOGY_SHIFT) & TOPOLOGY_MASK); return (int) ((encoded >>> TOPOLOGY_SHIFT) & TOPOLOGY_MASK);
} }
public static int decodeYear(long encoded) { public static int decodeYear(long encoded) {
return PubDate.fromYearByte((int) ((encoded >>> YEAR_SHIFT) & YEAR_MASK)); return PubDate.fromYearByte((int) ((encoded >>> YEAR_SHIFT) & YEAR_MASK));
} }

View File

@ -1,5 +1,6 @@
package nu.marginalia.wmsa.edge.index.model; package nu.marginalia.model.idx;
import nu.marginalia.model.crawl.EdgePageWordFlags;
import nu.marginalia.util.BrailleBlockPunchCards; import nu.marginalia.util.BrailleBlockPunchCards;
import java.util.EnumSet; import java.util.EnumSet;

View File

@ -1,4 +1,6 @@
package nu.marginalia.wmsa.edge.converting.interpreter.instruction; package nu.marginalia.util;
import nu.marginalia.model.crawl.DocumentKeywords;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;

View File

@ -0,0 +1,47 @@
package nu.marginalia.util;
import java.util.ArrayList;
import java.util.List;
public class LineUtils {
/** LF, CR, CRLF, LFCR-agnostic string-line splitter that preserves empty lines
* that does not create a huge blob of a last item like String$split(regex, n)
*
*/
public static List<String> firstNLines(String documentBody, int numLines) {
List<String> lines = new ArrayList<>(numLines);
boolean eatCr = false;
boolean eatLf = false;
int startPos = 0;
for (int pos = 0; pos < documentBody.length() && lines.size() < numLines; pos++) {
int cp = documentBody.charAt(pos);
if (cp == '\r') {
if (eatCr) {
eatCr = false;
}
else {
eatLf = true;
lines.add(documentBody.substring(startPos, pos));
}
startPos = pos + 1;
} else if (cp == '\n') {
if (eatLf) {
eatLf = false;
}
else {
eatCr = true;
lines.add(documentBody.substring(startPos, pos));
}
startPos = pos + 1;
} else {
eatCr = eatLf = false;
}
}
return lines;
}
}

View File

@ -9,6 +9,12 @@ import java.util.List;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** Generalization of the workflow <br>
* -- single provider thread reading sequentially from disk <br>
* -> multiple independent CPU-bound processing tasks <br>
* -> single consumer thread writing to network/disk <br>
* <p>
*/
public abstract class ParallelPipe<INPUT,INTERMEDIATE> { public abstract class ParallelPipe<INPUT,INTERMEDIATE> {
private final LinkedBlockingQueue<INPUT> inputs; private final LinkedBlockingQueue<INPUT> inputs;
private final LinkedBlockingQueue<INTERMEDIATE> intermediates; private final LinkedBlockingQueue<INTERMEDIATE> intermediates;
@ -61,8 +67,9 @@ public abstract class ParallelPipe<INPUT,INTERMEDIATE> {
} }
} }
logger.debug("Terminating {}", Thread.currentThread().getName()); logger.info("Terminating {}", Thread.currentThread().getName());
} }
@SneakyThrows @SneakyThrows
private void runReceiverThread() { private void runReceiverThread() {
while (expectingOutput || !inputs.isEmpty() || !intermediates.isEmpty()) { while (expectingOutput || !inputs.isEmpty() || !intermediates.isEmpty()) {
@ -80,12 +87,16 @@ public abstract class ParallelPipe<INPUT,INTERMEDIATE> {
logger.info("Terminating {}", Thread.currentThread().getName()); logger.info("Terminating {}", Thread.currentThread().getName());
} }
/** Begin processing an item */
@SneakyThrows @SneakyThrows
public void accept(INPUT input) { public void accept(INPUT input) {
inputs.put(input); inputs.put(input);
} }
/** The meat of the processor thread runtime */
protected abstract INTERMEDIATE onProcess(INPUT input) throws Exception; protected abstract INTERMEDIATE onProcess(INPUT input) throws Exception;
/** The meat of the consumer thread runtime */
protected abstract void onReceive(INTERMEDIATE intermediate) throws Exception; protected abstract void onReceive(INTERMEDIATE intermediate) throws Exception;
public void join() throws InterruptedException { public void join() throws InterruptedException {

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.edge.converting.processor.logic; package nu.marginalia.util;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;

View File

@ -74,7 +74,7 @@ CREATE TABLE IF NOT EXISTS EC_PAGE_DATA (
FORMAT ENUM('PLAIN', 'UNKNOWN', 'HTML123', 'HTML4', 'XHTML', 'HTML5', 'MARKDOWN') NOT NULL, FORMAT ENUM('PLAIN', 'UNKNOWN', 'HTML123', 'HTML4', 'XHTML', 'HTML5', 'MARKDOWN') NOT NULL,
FEATURES INT COMMENT "Bit-encoded feature set of document, @see HtmlFeature" NOT NULL, FEATURES INT COMMENT "Bit-encoded feature set of document, @see HtmlFeature" NOT NULL,
DATA_HASH INTEGER NOT NULL, DATA_HASH BIGINT NOT NULL,
QUALITY DOUBLE NOT NULL, QUALITY DOUBLE NOT NULL,
PUB_YEAR SMALLINT, PUB_YEAR SMALLINT,
@ -212,6 +212,13 @@ CREATE INDEX IF NOT EXISTS EC_DOMAIN_TOP_DOMAIN ON EC_DOMAIN (DOMAIN_TOP);
---; ---;
CREATE TABLE IF NOT EXISTS EC_RANDOM_DOMAINS (
DOMAIN_ID INT PRIMARY KEY,
DOMAIN_SET INT NOT NULL
);
---;
DROP TABLE IF EXISTS REF_DICTIONARY; DROP TABLE IF EXISTS REF_DICTIONARY;
CREATE TABLE IF NOT EXISTS REF_DICTIONARY ( CREATE TABLE IF NOT EXISTS REF_DICTIONARY (
@ -287,3 +294,4 @@ CREATE TABLE WMSA_PROCESS(
MUTEX VARCHAR(255), MUTEX VARCHAR(255),
TIMEOUT INT NOT NULL DEFAULT 60 TIMEOUT INT NOT NULL DEFAULT 60
); );

View File

@ -1,5 +1,6 @@
package nu.marginalia.wmsa.edge.model; package nu.marginalia.model;
import nu.marginalia.model.EdgeUrl;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.net.URISyntaxException; import java.net.URISyntaxException;

View File

@ -1,7 +1,7 @@
package nu.marginalia.wmsa.edge.model.crawl; package nu.marginalia.model;
import nu.marginalia.wmsa.edge.index.model.EdgePageWordFlags; import nu.marginalia.model.crawl.EdgePageWordFlags;
import nu.marginalia.wmsa.edge.index.model.EdgePageWordMetadata; import nu.marginalia.model.idx.EdgePageWordMetadata;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.util.EnumSet; import java.util.EnumSet;

View File

@ -1,5 +1,6 @@
package nu.marginalia.wmsa.edge.model; package nu.marginalia.model;
import nu.marginalia.model.EdgeUrl;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import java.net.URISyntaxException; import java.net.URISyntaxException;

View File

@ -1,5 +1,6 @@
package nu.marginalia.util; package nu.marginalia.util;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.*; import static org.junit.jupiter.api.Assertions.*;

View File

@ -0,0 +1,16 @@
package nu.marginalia.util;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
class LineUtilsTest {
@Test
void firstNLines() {
String text = "a\nb\r\ncd\r\re\n\rffgg\n\n";
List<String> expected = List.of("a", "b", "cd", "", "e", "ffgg", "");
Assertions.assertEquals(expected, LineUtils.firstNLines(text, 10));
}
}

9
common/readme.md Normal file
View File

@ -0,0 +1,9 @@
# Common
These are packages containing the basic building blocks for running a service as well
as shared models.
* [config](config/) contains some `@Inject`ables.
* [service](service/) is the shared base classes for main methods and web services.
* [service-client](service-client/) is the shared base class for RPC.
* [service-discovery](service-discovery) contains tools that lets the services find each other.

View File

@ -0,0 +1,56 @@
plugins {
id 'java'
id "io.freefair.lombok" version "5.3.3.3"
id "de.undercouch.download" version "5.1.0"
id 'jvm-test-suite'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
}
}
dependencies {
implementation project(':third-party')
implementation project(':protocol')
implementation project(':common:service-discovery')
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.bundles.slf4j
implementation libs.commons.lang3
implementation libs.spark
implementation libs.guice
implementation libs.rxjava
implementation libs.okhttp3
implementation libs.bundles.httpcomponents
implementation libs.bundles.gson
implementation libs.protobuf
implementation libs.bundles.prometheus
implementation libs.bundles.mariadb
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}
test {
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
maxHeapSize = "8G"
useJUnitPlatform()
}
task fastTests(type: Test) {
maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
maxHeapSize = "8G"
useJUnitPlatform {
excludeTags "slow"
}
}

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.client; package nu.marginalia.client;
import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.reactivex.rxjava3.core.Scheduler; import io.reactivex.rxjava3.core.Scheduler;
@ -12,7 +12,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
public class AbortingScheduler implements AutoCloseable { public class AbortingScheduler {
private final String name; private final String name;
private final ThreadFactory threadFactory; private final ThreadFactory threadFactory;
@ -54,7 +54,6 @@ public class AbortingScheduler implements AutoCloseable {
return executorService; return executorService;
} }
@Override
public synchronized void close() { public synchronized void close() {
if (null != executorService) { if (null != executorService) {
executorService.shutdown(); executorService.shutdown();

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.client; package nu.marginalia.client;
import com.google.gson.Gson; import com.google.gson.Gson;
import com.google.protobuf.GeneratedMessageV3; import com.google.protobuf.GeneratedMessageV3;
@ -6,11 +6,10 @@ import io.reactivex.rxjava3.core.Observable;
import io.reactivex.rxjava3.core.ObservableSource; import io.reactivex.rxjava3.core.ObservableSource;
import io.reactivex.rxjava3.plugins.RxJavaPlugins; import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import nu.marginalia.wmsa.client.exception.LocalException; import nu.marginalia.client.exception.LocalException;
import nu.marginalia.wmsa.client.exception.NetworkException; import nu.marginalia.client.exception.NetworkException;
import nu.marginalia.wmsa.client.exception.RemoteException; import nu.marginalia.client.exception.RemoteException;
import nu.marginalia.wmsa.client.exception.RouteNotConfiguredException; import nu.marginalia.client.exception.RouteNotConfiguredException;
import nu.marginalia.wmsa.configuration.server.Context;
import okhttp3.*; import okhttp3.*;
import org.apache.http.HttpHost; import org.apache.http.HttpHost;
import org.apache.logging.log4j.ThreadContext; import org.apache.logging.log4j.ThreadContext;
@ -24,32 +23,33 @@ import java.net.ConnectException;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.zip.GZIPOutputStream; import java.util.zip.GZIPOutputStream;
public abstract class AbstractClient implements AutoCloseable { public abstract class AbstractClient implements AutoCloseable {
public static final String CONTEXT_OUTBOUND_REQUEST = "outbound-request";
private final Gson gson = GsonFactory.get();
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
public static final String CONTEXT_OUTBOUND_REQUEST = "outbound-request";
private final Gson gson;
private final OkHttpClient client; private final OkHttpClient client;
private boolean quiet; private boolean quiet;
private String url; private String serviceRoute;
private int timeout;
private volatile boolean alive;
private final Thread livenessMonitor;
public void setTimeout(int timeout) { public void setTimeout(int timeout) {
this.timeout = timeout; this.timeout = timeout;
} }
private int timeout; public AbstractClient(String host, int port, int timeout, Supplier<Gson> gsonProvider) {
private volatile boolean alive;
private final Thread livenessMonitor;
public AbstractClient(String host, int port, int timeout) {
logger.info("Creating client for {}[{}:{}]", getClass().getSimpleName(), host, port); logger.info("Creating client for {}[{}:{}]", getClass().getSimpleName(), host, port);
this.gson = gsonProvider.get();
this.timeout = timeout; this.timeout = timeout;
client = new OkHttpClient.Builder() client = new OkHttpClient.Builder()
.connectTimeout(100, TimeUnit.MILLISECONDS) .connectTimeout(100, TimeUnit.MILLISECONDS)
@ -57,7 +57,7 @@ public abstract class AbstractClient implements AutoCloseable {
.retryOnConnectionFailure(true) .retryOnConnectionFailure(true)
.followRedirects(true) .followRedirects(true)
.build(); .build();
url = new HttpHost(host, port).toURI(); serviceRoute = new HttpHost(host, port).toURI();
RxJavaPlugins.setErrorHandler(e -> { RxJavaPlugins.setErrorHandler(e -> {
if (e.getMessage() == null) { if (e.getMessage() == null) {
@ -76,28 +76,37 @@ public abstract class AbstractClient implements AutoCloseable {
public void setServiceRoute(String hostname, int port) { public void setServiceRoute(String hostname, int port) {
scheduler().abort(); scheduler().abort();
url = new HttpHost(hostname, port).toURI(); serviceRoute = new HttpHost(hostname, port).toURI();
}
protected String getServiceRoute() {
return serviceRoute;
} }
@SneakyThrows @SneakyThrows
private void monitorLiveness() { private void monitorLiveness() {
Thread.sleep(100); // Wait for initialization Thread.sleep(100); // Wait for initialization
for (;;) { try {
try { for (; ; ) {
alive = isResponsive(); try {
} alive = isResponsive();
// }
catch (Exception ex) { //
logger.warn("Oops", ex); catch (Exception ex) {
} logger.warn("Oops", ex);
synchronized (livenessMonitor) { }
if (alive) { synchronized (livenessMonitor) {
livenessMonitor.wait(1000); if (alive) {
livenessMonitor.wait(1000);
}
}
if (!alive) {
Thread.sleep(100);
} }
} }
if (!alive) { }
Thread.sleep(100); catch (InterruptedException ex) {
} // nothing to see here
} }
} }
@ -117,11 +126,9 @@ public abstract class AbstractClient implements AutoCloseable {
public synchronized boolean isResponsive() { public synchronized boolean isResponsive() {
Context ctx = Context.internal("ping"); Context ctx = Context.internal("ping");
var req = ctx.paint(new Request.Builder()).url(url + "/internal/ping").get().build(); var req = ctx.paint(new Request.Builder()).url(serviceRoute + "/internal/ping").get().build();
var call = client.newCall(req); return Observable.just(client.newCall(req))
return Observable.just(call)
.subscribeOn(scheduler().get()) .subscribeOn(scheduler().get())
.map(Call::execute) .map(Call::execute)
.map(this::getResponseStatus) .map(this::getResponseStatus)
@ -135,11 +142,9 @@ public abstract class AbstractClient implements AutoCloseable {
public synchronized boolean isAccepting() { public synchronized boolean isAccepting() {
Context ctx = Context.internal("ready"); Context ctx = Context.internal("ready");
var req = ctx.paint(new Request.Builder()).url(url + "/internal/ready").get().build(); var req = ctx.paint(new Request.Builder()).url(serviceRoute + "/internal/ready").get().build();
var call = client.newCall(req); return Observable.just(client.newCall(req))
return Observable.just(call)
.subscribeOn(scheduler().get()) .subscribeOn(scheduler().get())
.map(Call::execute) .map(Call::execute)
.map(this::getResponseStatus) .map(this::getResponseStatus)
@ -156,19 +161,12 @@ public abstract class AbstractClient implements AutoCloseable {
ensureAlive(); ensureAlive();
RequestBody body = RequestBody.create( RequestBody body = RequestBody.create(json(data), MediaType.parse("application/json; charset=utf-8"));
MediaType.parse("application/json; charset=utf-8"),
json(data));
var req = ctx.paint(new Request.Builder()).url(url + endpoint).post(body).build(); var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).post(body).build();
var call = client.newCall(req);
return Observable return Observable
.just(call) .just(client.newCall(req))
.map((c) -> {
ThreadContext.put("outbound-request", url + endpoint);
return c;
})
.subscribeOn(scheduler().get()) .subscribeOn(scheduler().get())
.map(this::logInbound) .map(this::logInbound)
.map(Call::execute) .map(Call::execute)
@ -186,15 +184,13 @@ public abstract class AbstractClient implements AutoCloseable {
ensureAlive(); ensureAlive();
RequestBody body = RequestBody.create( RequestBody body = RequestBody.create(data.toByteArray(), MediaType.parse("application/protobuf"));
MediaType.parse("application/protobuf"),
data.toByteArray());
var req = ctx.paint(new Request.Builder()).url(url + endpoint).post(body).build(); var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).post(body).build();
var call = client.newCall(req); var call = client.newCall(req);
logInbound(call); logInbound(call);
ThreadContext.put("outbound-request", url + endpoint); ThreadContext.put("outbound-request", serviceRoute + endpoint);
try (var rsp = call.execute()) { try (var rsp = call.execute()) {
logOutbound(rsp); logOutbound(rsp);
int code = rsp.code(); int code = rsp.code();
@ -212,18 +208,10 @@ public abstract class AbstractClient implements AutoCloseable {
ensureAlive(); ensureAlive();
RequestBody body = RequestBody.create( RequestBody body = RequestBody.create(json(data), MediaType.parse("application/json"));
MediaType.parse("application/json"), var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).post(body).build();
json(data));
var req = ctx.paint(new Request.Builder()).url(url + endpoint).post(body).build(); return Observable.just(client.newCall(req))
var call = client.newCall(req);
return Observable.just(call)
.map((c) -> {
ThreadContext.put("outbound-request", url + endpoint);
return c;
})
.subscribeOn(scheduler().get()) .subscribeOn(scheduler().get())
.map(this::logInbound) .map(this::logInbound)
.map(Call::execute) .map(Call::execute)
@ -238,15 +226,15 @@ public abstract class AbstractClient implements AutoCloseable {
protected synchronized Observable<HttpStatusCode> post(Context ctx, String endpoint, String data, MediaType mediaType) { protected synchronized Observable<HttpStatusCode> post(Context ctx, String endpoint, String data, MediaType mediaType) {
ensureAlive(); ensureAlive();
var body = RequestBody.create(mediaType, data); var body = RequestBody.create(data, mediaType);
var req = ctx.paint(new Request.Builder()).url(url + endpoint).post(body).build(); var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).post(body).build();
var call = client.newCall(req); var call = client.newCall(req);
return Observable.just(call) return Observable.just(call)
.map((c) -> { .map((c) -> {
ThreadContext.put(CONTEXT_OUTBOUND_REQUEST, url + endpoint); ThreadContext.put(CONTEXT_OUTBOUND_REQUEST, serviceRoute + endpoint);
return c; return c;
}) })
.subscribeOn(scheduler().get()) .subscribeOn(scheduler().get())
@ -264,14 +252,9 @@ public abstract class AbstractClient implements AutoCloseable {
protected synchronized <T> Observable<T> get(Context ctx, String endpoint, Class<T> type) { protected synchronized <T> Observable<T> get(Context ctx, String endpoint, Class<T> type) {
ensureAlive(); ensureAlive();
var req = ctx.paint(new Request.Builder()).url(url + endpoint).get().build(); var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).get().build();
var call = client.newCall(req);
return Observable.just(call) return Observable.just(client.newCall(req))
.map((c) -> {
ThreadContext.put("outbound-request", url + endpoint);
return c;
})
.subscribeOn(scheduler().get()) .subscribeOn(scheduler().get())
.map(this::logInbound) .map(this::logInbound)
.map(Call::execute) .map(Call::execute)
@ -284,62 +267,12 @@ public abstract class AbstractClient implements AutoCloseable {
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected synchronized <T> Observable<List<T>> getList(Context ctx, String endpoint, Class<T> type) {
ensureAlive();
var req = ctx.paint(new Request.Builder()).url(url + endpoint).get().build();
var call = client.newCall(req);
return Observable.just(call)
.map((c) -> {
ThreadContext.put("outbound-request", url + endpoint);
return c;
})
.subscribeOn(scheduler().get())
.map(this::logInbound)
.map(Call::execute)
.map(this::logOutbound)
.map(rsp -> validateResponseStatus(rsp, req, 200))
.map(rsp -> Arrays.asList((T[])getEntity(rsp, type.arrayType())))
.retryWhen(this::retryHandler)
.timeout(timeout, TimeUnit.SECONDS)
.doFinally(() -> ThreadContext.remove("outbound-request"));
}
protected synchronized Observable<byte[]> getBinary(Context ctx, String endpoint) {
ensureAlive();
var req = ctx.paint(new Request.Builder()).url(url + endpoint).get().build();
var call = client.newCall(req);
return Observable.just(call)
.map((c) -> {
ThreadContext.put("outbound-request", url + endpoint);
return c;
})
.subscribeOn(scheduler().get())
.map(this::logInbound)
.map(Call::execute)
.map(this::logOutbound)
.map(rsp -> validateResponseStatus(rsp, req, 200))
.map(this::getBinaryEntity)
.retryWhen(this::retryHandler)
.timeout(timeout, TimeUnit.SECONDS)
.doFinally(() -> ThreadContext.remove("outbound-request"));
}
protected synchronized Observable<String> get(Context ctx, String endpoint) { protected synchronized Observable<String> get(Context ctx, String endpoint) {
ensureAlive(); ensureAlive();
var req = ctx.paint(new Request.Builder()).url(url + endpoint).get().build(); var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).get().build();
var call = client.newCall(req);
return Observable.just(call) return Observable.just(client.newCall(req))
.map((c) -> {
ThreadContext.put("outbound-request", url + endpoint);
return c;
})
.subscribeOn(scheduler().get()) .subscribeOn(scheduler().get())
.map(this::logInbound) .map(this::logInbound)
.map(Call::execute) .map(Call::execute)
@ -354,14 +287,9 @@ public abstract class AbstractClient implements AutoCloseable {
protected synchronized Observable<HttpStatusCode> delete(Context ctx, String endpoint) { protected synchronized Observable<HttpStatusCode> delete(Context ctx, String endpoint) {
ensureAlive(); ensureAlive();
var req = ctx.paint(new Request.Builder()).url(url + endpoint).delete().build(); var req = ctx.paint(new Request.Builder()).url(serviceRoute + endpoint).delete().build();
var call = client.newCall(req);
return Observable.just(call) return Observable.just(client.newCall(req))
.map((c) -> {
ThreadContext.put("outbound-request", url + endpoint);
return c;
})
.subscribeOn(scheduler().get()) .subscribeOn(scheduler().get())
.map(this::logInbound) .map(this::logInbound)
.map(Call::execute) .map(Call::execute)
@ -390,26 +318,12 @@ public abstract class AbstractClient implements AutoCloseable {
if (!isAlive()) { if (!isAlive()) {
wait(2000); wait(2000);
if (!isAlive()) { if (!isAlive()) {
throw new RouteNotConfiguredException("Route not configured for " + name()); throw new RouteNotConfiguredException("Route not configured for " + name() + " -- tried " + serviceRoute);
} }
} }
} }
@SneakyThrows
public void waitReady() {
boolean accepting = isAccepting();
if (accepting) {
return;
}
logger.info("Waiting for " + name());
do {
Thread.sleep(1000);
} while (!isAccepting());
}
private ObservableSource<?> retryHandler(Observable<Throwable> error) { private ObservableSource<?> retryHandler(Observable<Throwable> error) {
return error.flatMap(this::filterRetryableExceptions); return error.flatMap(this::filterRetryableExceptions);
} }
@ -488,12 +402,6 @@ public abstract class AbstractClient implements AutoCloseable {
} }
@SneakyThrows
private byte[] getBinaryEntity(Response response) {
try (response) {
return response.body().bytes();
}
}
public boolean isAlive() { public boolean isAlive() {
return alive; return alive;
} }
@ -507,17 +415,4 @@ public abstract class AbstractClient implements AutoCloseable {
} }
} }
private byte[] compressedJson(Object o) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
GZIPOutputStream gos = new GZIPOutputStream(baos);
try {
gson.toJson(o, new OutputStreamWriter(gos));
gos.finish();
return baos.toByteArray();
}
catch (Exception ex) {
throw new LocalException(ex);
}
}
} }

View File

@ -1,21 +1,23 @@
package nu.marginalia.wmsa.client; package nu.marginalia.client;
import com.google.gson.Gson;
import io.reactivex.rxjava3.core.Observable; import io.reactivex.rxjava3.core.Observable;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import nu.marginalia.wmsa.configuration.ServiceDescriptor; import nu.marginalia.service.descriptor.ServiceDescriptor;
import nu.marginalia.wmsa.configuration.server.Context; import nu.marginalia.service.descriptor.HostsFile;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull; import javax.annotation.Nonnull;
import java.util.function.Supplier;
public class AbstractDynamicClient extends AbstractClient { public class AbstractDynamicClient extends AbstractClient {
private final ServiceDescriptor service; private final ServiceDescriptor service;
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
private final AbortingScheduler scheduler; private final AbortingScheduler scheduler;
public AbstractDynamicClient(@Nonnull ServiceDescriptor service) { public AbstractDynamicClient(@Nonnull ServiceDescriptor service, HostsFile hosts, Supplier<Gson> gsonProvider) {
super(service.getHost(), service.port, 10); super(hosts.getHost(service), service.port, 10, gsonProvider);
this.service = service; this.service = service;
this.scheduler = new AbortingScheduler(name()); this.scheduler = new AbortingScheduler(name());
@ -32,7 +34,7 @@ public class AbstractDynamicClient extends AbstractClient {
@SneakyThrows @SneakyThrows
public void blockingWait() { public void blockingWait() {
logger.info("Waiting for route to {}", service); logger.info("Waiting for route to {} ({})", service, getServiceRoute());
while (!isAlive()) { while (!isAlive()) {
Thread.sleep(1000); Thread.sleep(1000);
} }
@ -43,10 +45,4 @@ public class AbstractDynamicClient extends AbstractClient {
return scheduler; return scheduler;
} }
public Observable<String> who(Context ctx) {
return get(ctx, "/public/who");
}
public Observable<String> ping(Context ctx) {
return get(ctx, "/internal/ping");
}
} }

View File

@ -0,0 +1,89 @@
package nu.marginalia.client;
import io.reactivex.rxjava3.schedulers.Schedulers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import spark.Request;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class Context {
public static final String CONTEXT_HEADER = "X-Context";
public static final String SESSION_HEADER = "Cookie";
public static final String PUBLIC_HEADER = "X-Public";
private final String id;
private final String session;
private boolean treatAsPublic;
private Context(String id, String session) {
this.id = Objects.requireNonNull(id, "Context missing");
this.session = session;
}
public Context treatAsPublic() {
this.treatAsPublic = true;
return this;
}
public static Context internal() {
return new Context(UUID.randomUUID().toString(), null);
}
public static Context internal(String why) {
return new Context(why + ":" + System.nanoTime(), null);
}
public static Context fromRequest(Request request) {
if (Boolean.getBoolean("unit-test")) {
return Context.internal();
}
final var ctxHeader = anonymizeContext(request);
final var sessHeader = request.headers(SESSION_HEADER);
return new Context(ctxHeader, sessHeader);
}
private static String anonymizeContext(Request request) {
String header = request.headers(CONTEXT_HEADER);
if (header != null && header.contains("-")) {
// The public X-Context header contains info that traces to the
// external user's IP. Anonymize this by running it through a
// hash code blender with rotating salt
return ContextScrambler.anonymize(header);
}
else if (header != null) {
return header;
}
else {
// When no X-Context is provided, synthesize one from path
return request.pathInfo() + ":" + Thread.currentThread().getId();
}
}
public okhttp3.Request.Builder paint(okhttp3.Request.Builder requestBuilder) {
requestBuilder.addHeader(CONTEXT_HEADER, id);
if (session != null) {
requestBuilder.addHeader(SESSION_HEADER, session);
}
if (treatAsPublic) {
requestBuilder.header(PUBLIC_HEADER, "1");
}
return requestBuilder;
}
public String getContextId() {
return id;
}
public boolean isPublic() {
return id.startsWith("#");
}
}

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.client; package nu.marginalia.client;
public final class HttpStatusCode { public final class HttpStatusCode {
public final int code; public final int code;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.client.exception; package nu.marginalia.client.exception;
public class LocalException extends MessagingException { public class LocalException extends MessagingException {
public LocalException() { public LocalException() {

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.client.exception; package nu.marginalia.client.exception;
public class MessagingException extends RuntimeException { public class MessagingException extends RuntimeException {
public MessagingException() { public MessagingException() {

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.client.exception; package nu.marginalia.client.exception;
public class NetworkException extends MessagingException { public class NetworkException extends MessagingException {
public NetworkException() { public NetworkException() {

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.client.exception; package nu.marginalia.client.exception;
public class RemoteException extends MessagingException { public class RemoteException extends MessagingException {
public RemoteException() { public RemoteException() {

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.client.exception; package nu.marginalia.client.exception;
public class RouteNotConfiguredException extends MessagingException { public class RouteNotConfiguredException extends MessagingException {
public RouteNotConfiguredException() { public RouteNotConfiguredException() {

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.client.exception; package nu.marginalia.client.exception;
public class TimeoutException extends MessagingException { public class TimeoutException extends MessagingException {
public TimeoutException() { public TimeoutException() {

View File

@ -0,0 +1,180 @@
package nu.marginalia.client;
import com.google.gson.Gson;
import io.reactivex.rxjava3.core.Observable;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.SneakyThrows;
import org.junit.jupiter.api.*;
import spark.Request;
import spark.Response;
import spark.Spark;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import static org.junit.jupiter.api.Assertions.*;
public class AbstractClientTest {
static TestServer testServer;
static AbstractClient client;
Gson gson = new Gson();
@Data @AllArgsConstructor
private static class DummyObject {
public int num;
public String str;
}
@BeforeAll
public static void setUp() {
int port = new Random().nextInt(6000, 10000);
testServer = new TestServer(port);
client = new AbstractClient("localhost", port, 1, Gson::new) {
@Override
public AbortingScheduler scheduler() {
return new AbortingScheduler(name());
}
@Override
public String name() {
return "test";
}
};
client.setTimeout(1);
}
@AfterAll
public static void tearDown() {
testServer.close();
client.close();
}
private void assertError(Observable<?> observable) {
try {
observable.blockingSubscribe();
}
catch (RuntimeException ex) {
System.out.println("Got exception " + ex.getClass().getSimpleName() + " -- as expected!" );
return;
}
Assertions.fail("Expected exception");
}
@SneakyThrows
private Object timeout(Request request, Response response) {
Thread.sleep(5000);
return "yawn";
}
@SneakyThrows
private Object error404(Request request, Response response) {
Spark.halt(404);
return "";
}
@Test
public void testGetTimeout() {
testServer.get(this::timeout);
assertError(client.get(Context.internal(), "/get"));
}
@Test
public void testPostTimeout() {
testServer.post(this::timeout);
assertError(client.post(Context.internal(), "/post", "test"));
}
@Test
public void testDeleteTimeout() {
testServer.delete(this::timeout);
assertError(client.delete(Context.internal(), "/post"));
}
@Test
public void testPost404() {
testServer.post(this::error404);
assertError(client.post(Context.internal(), "/post", "test"));
}
@Test
public void testGet404() {
testServer.get(this::error404);
assertError(client.get(Context.internal(), "/get"));
}
@Test
public void testDelete404() {
testServer.delete(this::error404);
assertError(client.delete(Context.internal(), "/delete"));
}
@Test
public void testGet() {
testServer.get((req, rsp) -> "Hello World");
assertEquals("Hello World", client.get(Context.internal(), "/get").blockingFirst());
}
@Test
public void testAcceptingUp() {
testServer.setReady(true);
assertTrue(client.isAccepting());
}
@Test
public void testAcceptingDown() {
testServer.setReady(false);
assertFalse(client.isAccepting());
}
@Test
public void testGetJson() {
testServer.get((req, rsp) -> new DummyObject(5, "23"), new Gson()::toJson);
assertEquals(client.get(Context.internal(), "/get", DummyObject.class).blockingFirst(),
new DummyObject(5, "23"));
}
@Test
public void testDelete() {
testServer.delete((req, rsp) -> "Hello World");
assertTrue(client.delete(Context.internal(), "/delete").blockingFirst().isGood());
}
@Test
public void testPost() {
List<DummyObject> inbox = new ArrayList<>();
testServer.post((req, rsp) -> {
inbox.add(gson.fromJson(req.body(), DummyObject.class));
return "ok";
});
client.post(Context.internal(), "/post", new DummyObject(5, "23")).blockingSubscribe();
assertEquals(1, inbox.size());
assertEquals(new DummyObject(5, "23"), inbox.get(0));
}
@Test
public void testPostGet() {
List<DummyObject> inbox = new ArrayList<>();
testServer.post((req, rsp) -> {
inbox.add(gson.fromJson(req.body(), DummyObject.class));
return new DummyObject(1, "ret");
}, gson::toJson);
var ret = client.postGet(Context.internal(), "/post", new DummyObject(5, "23"), DummyObject.class).blockingFirst();
assertEquals(1, inbox.size());
assertEquals(new DummyObject(5, "23"), inbox.get(0));
assertEquals(new DummyObject(1, "ret"), ret);
}
}

View File

@ -0,0 +1,59 @@
package nu.marginalia.client;
import spark.Request;
import spark.Response;
import spark.Spark;
import java.util.function.BiFunction;
import java.util.function.Function;
public class TestServer {
BiFunction<Request, Response, Object> onGet;
BiFunction<Request, Response, Object> onPost;
BiFunction<Request, Response, Object> onDelete;
boolean isReady;
public TestServer(int port) {
Spark.port(port);
Spark.get("/internal/ping", (r,q) -> "pong");
Spark.get("/internal/ready", this::ready);
Spark.get("/get", (request, response) -> onGet.apply(request, response));
Spark.post("/post", (request, response) -> onPost.apply(request, response));
Spark.delete("/delete", (request, response) -> onDelete.apply(request, response));
}
private Object ready(Request request, Response response) {
if (isReady) {
return "";
}
else {
response.status(401);
return "bad";
}
}
public void close() {
Spark.stop();
}
public boolean isReady() {
return isReady;
}
public void setReady(boolean ready) {
isReady = ready;
}
public TestServer get(BiFunction<Request, Response, Object> onGet) { this.onGet = onGet; return this; }
public TestServer get(BiFunction<Request, Response, Object> onGet, Function<Object, Object> transform) {
this.onGet = onGet.andThen(transform);
return this;
}
public TestServer delete(BiFunction<Request, Response, Object> onDelete) { this.onDelete = onDelete; return this; }
public TestServer post(BiFunction<Request, Response, Object> onPost) { this.onPost = onPost; return this; }
public TestServer post(BiFunction<Request, Response, Object> onPost, Function<Object, Object> transform) {
this.onPost = onPost.andThen(transform); return this;
}
}

View File

@ -0,0 +1,22 @@
plugins {
id 'java'
}
repositories {
mavenLocal()
mavenCentral()
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
}
}
dependencies {
implementation libs.bundles.slf4j
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}

View File

@ -0,0 +1,17 @@
package nu.marginalia.service;
import nu.marginalia.service.descriptor.ServiceDescriptor;
import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import java.util.List;
public class SearchServiceDescriptors {
public static ServiceDescriptors descriptors = new ServiceDescriptors(
List.of(new ServiceDescriptor(ServiceId.Api, 5004),
new ServiceDescriptor(ServiceId.Index, 5021),
new ServiceDescriptor(ServiceId.Search, 5023),
new ServiceDescriptor(ServiceId.Assistant, 5025),
new ServiceDescriptor(ServiceId.Dating, 5070),
new ServiceDescriptor(ServiceId.Explorer, 5071)));
}

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.configuration; package nu.marginalia.service.descriptor;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -13,7 +13,7 @@ import java.util.Map;
* *
* */ * */
public class HostsFile { public class HostsFile {
private final Map<ServiceDescriptor, String> hostsMap = new HashMap<>(ServiceDescriptor.values().length); private final Map<String, String> hostsMap = new HashMap<>();
private static final Logger logger = LoggerFactory.getLogger(HostsFile.class); private static final Logger logger = LoggerFactory.getLogger(HostsFile.class);
public HostsFile(Path fileName) throws IOException { public HostsFile(Path fileName) throws IOException {
var lines = Files.readAllLines(fileName); var lines = Files.readAllLines(fileName);
@ -27,7 +27,7 @@ public class HostsFile {
String hostName = parts[1]; String hostName = parts[1];
try { try {
hostsMap.put(ServiceDescriptor.byName(descriptorName), hostName); hostsMap.put(descriptorName, hostName);
} }
catch (IllegalArgumentException ex) { catch (IllegalArgumentException ex) {
logger.warn("Hosts file contains entry for unknown service {}", descriptorName); logger.warn("Hosts file contains entry for unknown service {}", descriptorName);
@ -36,13 +36,10 @@ public class HostsFile {
} }
public HostsFile() { public HostsFile() {
for (var sd : ServiceDescriptor.values()) {
hostsMap.put(sd, "localhost");
}
} }
public String getHost(ServiceDescriptor sd) { public String getHost(ServiceDescriptor sd) {
return hostsMap.get(sd); return hostsMap.getOrDefault(sd.name, sd.name);
} }
} }

View File

@ -0,0 +1,23 @@
package nu.marginalia.service.descriptor;
import nu.marginalia.service.id.ServiceId;
public class ServiceDescriptor {
public final ServiceId id;
public final String name;
public final int port;
public ServiceDescriptor(ServiceId id, int port) {
this.id = id;
this.name = id.name;
this.port = port;
}
public String toString() {
return name;
}
public String describeService() {
return String.format("%s", name);
}
}

View File

@ -0,0 +1,31 @@
package nu.marginalia.service.descriptor;
import nu.marginalia.service.id.ServiceId;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
public class ServiceDescriptors {
private final Map<ServiceId, ServiceDescriptor> descriptorsAll = new LinkedHashMap<>();
public ServiceDescriptors() {
}
public ServiceDescriptors(List<ServiceDescriptor> descriptors) {
descriptors.forEach(d -> descriptorsAll.put(d.id, d));
}
public ServiceDescriptor[] values() {
return descriptorsAll.values().toArray(ServiceDescriptor[]::new);
}
public ServiceDescriptor forId(ServiceId id) {
return Objects.requireNonNull(descriptorsAll.get(id),
"No service descriptor defined for " + id + " -- did you forget to "
+ "bind(ServiceDescriptors.class).toInstance(SearchServiceDescriptors.descriptors); ?");
}
}

View File

@ -0,0 +1,25 @@
package nu.marginalia.service.id;
public enum ServiceId {
Assistant("assistant-service"),
Api("api-service"),
Search("search-service"),
Index("index-service"),
Dating("dating-service"),
Explorer("explorer-service"),
Other_Auth("auth"),
Other_Memex("memex"),
Other_ResourceStore("resource-store"),
Other_Renderer("renderer"),
Other_PodcastScraper("podcast-scraper");
public final String name;
ServiceId(String name) {
this.name = name;
}
}

View File

@ -0,0 +1,35 @@
plugins {
id 'java'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(17))
}
}
dependencies {
implementation project(':common:service-client')
implementation project(':common:service-discovery')
implementation project(':libraries:misc')
implementation libs.lombok
annotationProcessor libs.lombok
implementation libs.spark
implementation libs.guice
implementation libs.rxjava
implementation libs.bundles.prometheus
implementation libs.bundles.slf4j
implementation libs.bucket4j
testImplementation libs.bundles.slf4j.test
implementation libs.bundles.mariadb
testImplementation libs.bundles.slf4j.test
testImplementation libs.bundles.junit
testImplementation libs.mockito
}

View File

@ -0,0 +1,62 @@
package nu.marginalia.service;
import io.prometheus.client.hotspot.DefaultExports;
import io.reactivex.rxjava3.exceptions.UndeliverableException;
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.client.exception.NetworkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.SocketTimeoutException;
import java.net.UnknownHostException;
public abstract class MainClass {
private final Logger logger = LoggerFactory.getLogger(getClass());
public MainClass() {
RxJavaPlugins.setErrorHandler(this::handleError);
}
protected void handleError(Throwable ex) {
if (ex instanceof UndeliverableException) {
ex = ex.getCause();
}
if (ex instanceof SocketTimeoutException) {
logger.warn("SocketTimeoutException");
}
else if (ex instanceof UnknownHostException) {
logger.warn("UnknownHostException");
}
else if (ex instanceof NetworkException) {
logger.warn("NetworkException", ex);
}
else {
logger.error("Uncaught exception", ex);
}
}
protected static void init(ServiceId id, String... args) {
System.setProperty("log4j2.isThreadContextMapInheritable", "true");
System.setProperty("isThreadContextMapInheritable", "true");
System.setProperty("service-name", id.name);
initJdbc();
initPrometheus();
}
private static void initJdbc() {
// This looks weird, but it's just for running the static block
// in the driver class so that it registers itself
new org.mariadb.jdbc.Driver();
}
private static void initPrometheus() {
DefaultExports.initialize();
}
}

View File

@ -0,0 +1,35 @@
package nu.marginalia.service.module;
import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.name.Named;
import com.google.inject.name.Names;
import nu.marginalia.service.descriptor.ServiceDescriptors;
import nu.marginalia.service.id.ServiceId;
import java.util.Objects;
public class ConfigurationModule extends AbstractModule {
private static final String SERVICE_NAME = System.getProperty("service-name");
private final ServiceDescriptors descriptors;
private final ServiceId id;
public ConfigurationModule(ServiceDescriptors descriptors, ServiceId id) {
this.descriptors = descriptors;
this.id = id;
}
public void configure() {
bind(ServiceDescriptors.class).toInstance(descriptors);
bind(String.class).annotatedWith(Names.named("service-name")).toInstance(Objects.requireNonNull(SERVICE_NAME));
bind(String.class).annotatedWith(Names.named("service-host")).toInstance(System.getProperty("service-host", "127.0.0.1"));
bind(Integer.class).annotatedWith(Names.named("service-port")).toInstance(descriptors.forId(id).port);
}
@Provides
@Named("metrics-server-port")
public Integer provideMetricsServerPort(@Named("service-port") Integer servicePort) {
return servicePort + 1000;
}
}

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.configuration.module; package nu.marginalia.service.module;
import com.google.inject.AbstractModule; import com.google.inject.AbstractModule;
import com.google.inject.Provides; import com.google.inject.Provides;
@ -6,17 +6,15 @@ import com.google.inject.Singleton;
import com.zaxxer.hikari.HikariConfig; import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource; import com.zaxxer.hikari.HikariDataSource;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import nu.marginalia.wmsa.configuration.WmsaHome;
import org.h2.tools.RunScript;
import org.mariadb.jdbc.Driver; import org.mariadb.jdbc.Driver;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.file.Files; import java.nio.file.Files;
import java.nio.file.Path; import java.nio.file.Path;
import java.util.Optional;
import java.util.Properties; import java.util.Properties;
public class DatabaseModule extends AbstractModule { public class DatabaseModule extends AbstractModule {
@ -35,7 +33,7 @@ public class DatabaseModule extends AbstractModule {
} }
private Properties loadDbProperties() { private Properties loadDbProperties() {
Path propDir = WmsaHome.getHomePath().resolve("conf/db.properties"); Path propDir = getHomePath().resolve("conf/db.properties");
if (!Files.isRegularFile(propDir)) { if (!Files.isRegularFile(propDir)) {
throw new IllegalStateException("Database properties file " + propDir + " does not exist"); throw new IllegalStateException("Database properties file " + propDir + " does not exist");
} }
@ -56,22 +54,27 @@ public class DatabaseModule extends AbstractModule {
} }
public static Path getHomePath() {
var retStr = Optional.ofNullable(System.getenv("WMSA_HOME")).orElse("/var/lib/wmsa");
var ret = Path.of(retStr);
if (!Files.isDirectory(ret)) {
throw new IllegalStateException("Could not find WMSA_HOME, either set environment variable or ensure /var/lib/wmsa exists");
}
return ret;
}
@SneakyThrows @SneakyThrows
@Singleton @Singleton
@Provides @Provides
public HikariDataSource provideConnection() { public HikariDataSource provideConnection() {
if (Boolean.getBoolean("data-store-h2")) { return getMariaDB();
return getH2();
}
else {
return getMariaDB();
}
} }
@SneakyThrows @SneakyThrows
private HikariDataSource getMariaDB() { private HikariDataSource getMariaDB() {
var connStr = dbProperties.getProperty(DB_CONN_KEY); var connStr = System.getProperty("db.overrideJdbc", dbProperties.getProperty(DB_CONN_KEY));
try { try {
HikariConfig config = new HikariConfig(); HikariConfig config = new HikariConfig();
@ -94,22 +97,4 @@ public class DatabaseModule extends AbstractModule {
} }
} }
@SneakyThrows
private HikariDataSource getH2() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:h2:~/wmsa-db");
config.setUsername("wmsa");
config.setPassword("");
var ds = new HikariDataSource(config);
try (var stream = ClassLoader.getSystemResourceAsStream("sql/data-store-init.sql")) {
RunScript.execute(ds.getConnection(), new InputStreamReader(stream));
}
try (var stream = ClassLoader.getSystemResourceAsStream("sql/edge-crawler-cache.sql")) {
RunScript.execute(ds.getConnection(), new InputStreamReader(stream));
}
return ds;
}
} }

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.configuration.module; package nu.marginalia.service.module;
import com.google.inject.name.Named; import com.google.inject.name.Named;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.configuration.module; package nu.marginalia.service.module;
import com.google.inject.name.Named; import com.google.inject.name.Named;

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.configuration.server; package nu.marginalia.service.server;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import lombok.SneakyThrows; import lombok.SneakyThrows;
@ -22,11 +22,6 @@ public class Initialization {
initialized = true; initialized = true;
notifyAll(); notifyAll();
} }
if (Boolean.getBoolean("go-no-go")) {
logger.info("Self-test OK");
System.exit(0);
}
} }
public boolean isReady() { public boolean isReady() {
@ -41,7 +36,7 @@ public class Initialization {
while (!initialized) { while (!initialized) {
wait(); wait();
} }
return initialized; return true;
} }
} }
} }

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.configuration.server; package nu.marginalia.service.server;
import com.google.inject.Inject; import com.google.inject.Inject;
import com.google.inject.name.Named; import com.google.inject.name.Named;

View File

@ -1,10 +1,10 @@
package nu.marginalia.wmsa.configuration.server; package nu.marginalia.service.server;
import io.github.bucket4j.Bandwidth; import io.github.bucket4j.Bandwidth;
import io.github.bucket4j.Bucket; import io.github.bucket4j.Bucket;
import io.github.bucket4j.Bucket4j;
import io.github.bucket4j.Refill; import io.github.bucket4j.Refill;
import io.reactivex.rxjava3.schedulers.Schedulers; import io.reactivex.rxjava3.schedulers.Schedulers;
import nu.marginalia.client.Context;
import java.time.Duration; import java.time.Duration;
import java.util.Map; import java.util.Map;
@ -49,13 +49,11 @@ public class RateLimiter {
} }
public boolean isAllowed(Context ctx) { public boolean isAllowed(Context ctx) {
final Optional<String> maybeIp = ctx.getIpHash(); if (!ctx.isPublic()) { // Internal server->server request
if (maybeIp.isEmpty()) { // Internal server->server request
return true; return true;
} }
return bucketMap.computeIfAbsent(maybeIp.get(), return bucketMap.computeIfAbsent(ctx.getContextId(),
(ip) -> createBucket()).tryConsume(1); (ip) -> createBucket()).tryConsume(1);
} }

View File

@ -1,10 +1,9 @@
package nu.marginalia.wmsa.configuration.server; package nu.marginalia.service.server;
import com.google.common.base.Strings; import com.google.common.base.Strings;
import io.prometheus.client.Counter; import io.prometheus.client.Counter;
import nu.marginalia.wmsa.client.exception.MessagingException; import nu.marginalia.client.Context;
import org.apache.http.HttpStatus; import nu.marginalia.client.exception.MessagingException;
import org.apache.logging.log4j.ThreadContext;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.slf4j.Marker; import org.slf4j.Marker;
@ -17,6 +16,8 @@ import java.util.Optional;
public class Service { public class Service {
private final Logger logger = LoggerFactory.getLogger(getClass()); private final Logger logger = LoggerFactory.getLogger(getClass());
// Marker for filtering out sensitive content from the persistent logs
private final Marker httpMarker = MarkerFactory.getMarker("HTTP"); private final Marker httpMarker = MarkerFactory.getMarker("HTTP");
private final Initialization initialization; private final Initialization initialization;
@ -53,8 +54,8 @@ public class Service {
configureStaticFiles.run(); configureStaticFiles.run();
Spark.before(this::filterPublicRequests);
Spark.before(this::auditRequestIn); Spark.before(this::auditRequestIn);
Spark.before(this::filterPublicRequests);
Spark.after(this::auditRequestOut); Spark.after(this::auditRequestOut);
Spark.exception(MessagingException.class, this::handleException); Spark.exception(MessagingException.class, this::handleException);
@ -86,14 +87,14 @@ public class Service {
if (!request.pathInfo().startsWith("/public/")) { if (!request.pathInfo().startsWith("/public/")) {
logger.warn(httpMarker, "External connection to internal API: {} -> {} {}", context, request.requestMethod(), request.pathInfo()); logger.warn(httpMarker, "External connection to internal API: {} -> {} {}", context, request.requestMethod(), request.pathInfo());
Spark.halt(HttpStatus.SC_FORBIDDEN); Spark.halt(403);
} }
String url = request.pathInfo(); String url = request.pathInfo();
if (request.queryString() != null) { if (request.queryString() != null) {
url = url + "?" + request.queryString(); url = url + "?" + request.queryString();
} }
logger.info(httpMarker, "PUBLIC {}: {} {}", Context.fromRequest(request).getIpHash().orElse("?"), request.requestMethod(), url); logger.info(httpMarker, "PUBLIC {}: {} {}", Context.fromRequest(request).getContextId(), request.requestMethod(), url);
} }
private Object isInitialized(Request request, Response response) { private Object isInitialized(Request request, Response response) {
@ -101,7 +102,7 @@ public class Service {
return "ok"; return "ok";
} }
else { else {
response.status(HttpStatus.SC_FAILED_DEPENDENCY); response.status(424);
return "bad"; return "bad";
} }
} }
@ -115,21 +116,21 @@ public class Service {
return "ok"; return "ok";
} }
else { else {
response.status(HttpStatus.SC_FAILED_DEPENDENCY); response.status(424);
return "bad"; return "bad";
} }
} }
private void auditRequestIn(Request request, Response response) { private void auditRequestIn(Request request, Response response) {
request_counter.labels(serviceName).inc();
// Paint context // Paint context
if (!Strings.isNullOrEmpty(request.headers(Context.CONTEXT_HEADER))) { paintThreadName(request, "req:");
Context.fromRequest(request);
} request_counter.labels(serviceName).inc();
} }
private void auditRequestOut(Request request, Response response) { private void auditRequestOut(Request request, Response response) {
ThreadContext.clearMap();
paintThreadName(request, "rsp:");
if (response.status() < 400) { if (response.status() < 400) {
request_counter_good.labels(serviceName).inc(); request_counter_good.labels(serviceName).inc();
@ -143,6 +144,11 @@ public class Service {
} }
} }
private void paintThreadName(Request request, String prefix) {
var ctx = Context.fromRequest(request);
Thread.currentThread().setName(prefix + ctx.getContextId());
}
private void handleException(Exception ex, Request request, Response response) { private void handleException(Exception ex, Request request, Response response) {
request_counter_err.labels(serviceName).inc(); request_counter_err.labels(serviceName).inc();
if (ex instanceof MessagingException) { if (ex instanceof MessagingException) {

View File

@ -1,4 +1,4 @@
package nu.marginalia.wmsa.resource_store; package nu.marginalia.service.server;
import lombok.SneakyThrows; import lombok.SneakyThrows;
import spark.Request; import spark.Request;

Some files were not shown because too many files have changed in this diff Show More