(service) Clean up thread pool creation

This commit is contained in:
Viktor Lofgren 2024-02-28 14:06:32 +01:00
parent 9a045a0588
commit 86bbc1043e
3 changed files with 41 additions and 42 deletions

View File

@ -0,0 +1,32 @@
package nu.marginalia.service;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
public class NamedExecutorFactory {
/** Create a new fixed thread pool with the given name and number of threads. */
public static ExecutorService createFixed(String name, int nThreads) {
return Executors.newFixedThreadPool(nThreads, new NamedThreadFactory(name));
}
private static class NamedThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String name;
private NamedThreadFactory(String name) {
this.name = name;
}
@Override
public Thread newThread(@NotNull Runnable r) {
var thread = new Thread(r, STR."\{name}[\{threadNumber.getAndIncrement()}]");
thread.setDaemon(true);
return thread;
}
}
}

View File

@ -4,19 +4,15 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import nu.marginalia.service.NamedExecutorFactory;
import nu.marginalia.service.NodeConfigurationWatcher;
import nu.marginalia.service.discovery.ServiceRegistryIf;
import nu.marginalia.service.discovery.property.PartitionTraits;
import nu.marginalia.service.discovery.property.ServiceEndpoint.InstanceAddress;
import nu.marginalia.service.discovery.property.ServiceKey;
import nu.marginalia.service.discovery.property.ServicePartition;
import org.jetbrains.annotations.NotNull;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@Singleton
@ -24,26 +20,10 @@ public class GrpcChannelPoolFactory {
private final NodeConfigurationWatcher nodeConfigurationWatcher;
private final ServiceRegistryIf serviceRegistryIf;
private static final Executor executor = Executors.newFixedThreadPool(
Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 16), new ThreadFactory() {
static final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(@NotNull Runnable r) {
var thread = new Thread(r, STR."gRPC-Channel-Pool[\{threadNumber.getAndIncrement()}]");
thread.setDaemon(true);
return thread;
}
});
private static final Executor offloadExecutor = Executors.newFixedThreadPool(
Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 16), new ThreadFactory() {
static final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(@NotNull Runnable r) {
var thread = new Thread(r, STR."gRPC-Offload-Executor[\{threadNumber.getAndIncrement()}]");
thread.setDaemon(true);
return thread;
}
});
private static final Executor executor = NamedExecutorFactory.createFixed("gRPC-Channel-Pool",
Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 16));
private static final Executor offloadExecutor = NamedExecutorFactory.createFixed("gRPC-Offload-Pool",
Math.clamp(Runtime.getRuntime().availableProcessors() / 2, 2, 16));
@Inject
public GrpcChannelPoolFactory(NodeConfigurationWatcher nodeConfigurationWatcher,

View File

@ -7,6 +7,7 @@ import io.grpc.netty.shaded.io.netty.channel.socket.nio.NioServerSocketChannel;
import io.prometheus.client.Counter;
import lombok.SneakyThrows;
import nu.marginalia.mq.inbox.*;
import nu.marginalia.service.NamedExecutorFactory;
import nu.marginalia.service.discovery.property.*;
import nu.marginalia.service.id.ServiceId;
import nu.marginalia.service.server.mq.ServiceMqSubscription;
@ -133,9 +134,9 @@ public class Service {
// Start the gRPC server
var grpcServerBuilder = NettyServerBuilder.forAddress(new InetSocketAddress(config.bindAddress(), port))
.executor(namedExecutor("nettyExecutor", nThreads))
.workerEventLoopGroup(new NioEventLoopGroup(nThreads, namedExecutor("Worker-ELG", nThreads)))
.bossEventLoopGroup(new NioEventLoopGroup(nThreads, namedExecutor("Boss-ELG", nThreads)))
.executor(NamedExecutorFactory.createFixed("nettyExecutor", nThreads))
.workerEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Worker-ELG", nThreads)))
.bossEventLoopGroup(new NioEventLoopGroup(nThreads, NamedExecutorFactory.createFixed("Boss-ELG", nThreads)))
.channelType(NioServerSocketChannel.class);
for (var grpcService : grpcServices) {
@ -153,20 +154,6 @@ public class Service {
}
}
private ExecutorService namedExecutor(String name, int limit) {
return Executors.newFixedThreadPool(
limit,
new ThreadFactory() {
static final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
var thread = new Thread(r, STR."\{name}[\{threadNumber.getAndIncrement()}]");
thread.setDaemon(true);
return thread;
}
});
}
public Service(BaseServiceParams params,
ServicePartition partition,
List<BindableService> grpcServices) {