(mq) Add better error handling in fsm and mq

java.lang.Error:s were not handled properly, leading to mismatch in the bookkeeping of the FSMs.  These are now caught, acted on, and re-thrown.

MqSynchronousInbox also no longer assumes all exceptions are InterruptedException.
This commit is contained in:
Viktor Lofgren 2024-01-22 12:58:33 +01:00
parent 6a1bfd6270
commit 3a325845c7
5 changed files with 9 additions and 96 deletions

View File

@ -288,6 +288,11 @@ public class ActorStateMachine {
logger.error("Error in state machine transition", e);
setErrorState();
}
catch (Error e) {
setErrorState();
logger.error("Error in state machine transition", e);
throw e;
}
}
private void setErrorState() {

View File

@ -133,9 +133,12 @@ public class MqSynchronousInbox implements MqInboxIf {
currentTask = executorService.submit(() -> handleMessage(msg));
currentTask.get();
}
catch (Exception ex) {
catch (InterruptedException ex) {
logger.error("Inbox task was aborted");
}
catch (Exception ex) {
logger.error("Exception handling inbox task", ex);
}
finally {
currentTask = null;
}

View File

@ -1,17 +0,0 @@
plugins {
id 'java'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(21))
}
}
dependencies {
implementation libs.ffi
}
test {
useJUnitPlatform()
}

View File

@ -1,6 +0,0 @@
# Uppend
[Uppend](https://github.com/upserve/uppend) - MIT
It's "an append-only, key-multivalue store". Cool project, but we're unceremoniously pillaging just a small piece of
code they did for calling [memadvise()](https://man7.org/linux/man-pages/man2/madvise.2.html) on off-heap byte buffers.

View File

@ -1,72 +0,0 @@
package com.upserve.uppend.blobs;
import com.kenai.jffi.MemoryIO;
import jnr.ffi.LibraryLoader;
import jnr.ffi.types.size_t;
import java.io.IOException;
import java.nio.MappedByteBuffer;
// https://github.com/upserve/uppend/blob/70967c6f24d7f1a3bbc18799f485d981da93f53b/src/main/java/com/upserve/uppend/blobs/NativeIO.java
// MIT License
public class NativeIO {
private static final NativeC nativeC = LibraryLoader.create(NativeC.class).load("c");
public static final int pageSize = nativeC.getpagesize(); // 4096 on most Linux
public enum Advice {
// These seem to be fairly stable https://github.com/torvalds/linux
// TODO add to https://github.com/jnr/jnr-constants
Normal(0), Random(1), Sequential(2), WillNeed(3), DontNeed(4);
private final int value;
Advice(int val) {
this.value = val;
}
}
public interface NativeC {
int madvise(@size_t long address, @size_t long size, int advice);
int getpagesize();
}
static long alignedAddress(long address) {
return address & (- pageSize);
}
static long alignedSize(long address, int capacity) {
long end = address + capacity;
end = (end + pageSize - 1) & (-pageSize);
return end - alignedAddress(address);
}
public static void madvise(MappedByteBuffer buffer, Advice advice) throws IOException {
final long address = MemoryIO.getInstance().getDirectBufferAddress(buffer);
final int capacity = buffer.capacity();
long alignedAddress = alignedAddress(address);
long alignedSize = alignedSize(alignedAddress, capacity);
int val = nativeC.madvise(alignedAddress, alignedSize, advice.value);
if (val != 0) {
throw new IOException(String.format("System call madvise failed with code: %d", val));
}
}
public static void madviseRange(MappedByteBuffer buffer, Advice advice, long offset, int length) throws IOException {
final long address = MemoryIO.getInstance().getDirectBufferAddress(buffer);
long alignedAddress = alignedAddress(address+offset);
long alignedSize = alignedSize(alignedAddress, length);
int val = nativeC.madvise(alignedAddress, alignedSize, advice.value);
if (val != 0) {
throw new IOException(String.format("System call madvise failed with code: %d", val));
}
}
}