From a00cabe223eaff2cf2ed74593212c506bad90f1e Mon Sep 17 00:00:00 2001 From: Viktor Lofgren Date: Mon, 11 Sep 2023 14:06:43 +0200 Subject: [PATCH] (parquet-floor) Patch in support for writing and reading repeated values --- .../blue/strategic/parquet/ParquetReader.java | 13 ++-- .../blue/strategic/parquet/ParquetWriter.java | 59 ++++++++++++++----- .../blue/strategic/parquet/ValueWriter.java | 3 + 3 files changed, 52 insertions(+), 23 deletions(-) diff --git a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java index 4ebcfe60..3eee03d0 100644 --- a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java +++ b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java @@ -20,11 +20,7 @@ import java.io.Closeable; import java.io.File; import java.io.FileInputStream; import java.io.IOException; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Set; -import java.util.Spliterator; +import java.util.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.stream.Collectors; @@ -195,8 +191,11 @@ public final class ParquetReader implements Spliterator, Closeable { U record = hydrator.start(); for (ColumnReader columnReader: this.currentRowGroupColumnReaders) { - record = hydrator.add(record, columnReader.getDescriptor().getPath()[0], readValue(columnReader)); - columnReader.consume(); + do { + record = hydrator.add(record, columnReader.getDescriptor().getPath()[0], readValue(columnReader)); + columnReader.consume(); + } while (columnReader.getCurrentRepetitionLevel() != 0); + if (columnReader.getCurrentRepetitionLevel() != 0) { throw new IllegalStateException("Unexpected repetition"); } diff --git a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java index 68d4ba76..7840c49e 100644 --- a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java +++ b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java @@ -18,6 +18,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.util.Collections; +import java.util.List; public final class ParquetWriter implements Closeable { @@ -108,7 +109,17 @@ public final class ParquetWriter implements Closeable { private static class SimpleWriteSupport extends WriteSupport { private final MessageType schema; private final Dehydrator dehydrator; - private final ValueWriter valueWriter = SimpleWriteSupport.this::writeField; + private final ValueWriter valueWriter = new ValueWriter() { + @Override + public void write(String name, Object value) { + SimpleWriteSupport.this.writeField(name, value); + } + + @Override + public void writeList(String name, List value) { + SimpleWriteSupport.this.writeList(name, value); + } + }; private RecordConsumer recordConsumer; @@ -144,23 +155,39 @@ public final class ParquetWriter implements Closeable { PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType(); recordConsumer.startField(name, fieldIndex); - switch (type.getPrimitiveTypeName()) { - case INT32: recordConsumer.addInteger((int)value); break; - case INT64: recordConsumer.addLong((long)value); break; - case DOUBLE: recordConsumer.addDouble((double)value); break; - case BOOLEAN: recordConsumer.addBoolean((boolean)value); break; - case FLOAT: recordConsumer.addFloat((float)value); break; - case BINARY: - if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.stringType()) { - recordConsumer.addBinary(Binary.fromString((String)value)); - } else { - throw new UnsupportedOperationException("We don't support writing " + type.getLogicalTypeAnnotation()); - } - break; - default: - throw new UnsupportedOperationException("We don't support writing " + type.getPrimitiveTypeName()); + writeValue(type, value); + + recordConsumer.endField(name, fieldIndex); + } + + private void writeList(String name, List values) { + int fieldIndex = schema.getFieldIndex(name); + PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType(); + recordConsumer.startField(name, fieldIndex); + for (var value : values) { + writeValue(type, value); } recordConsumer.endField(name, fieldIndex); } + + void writeValue(PrimitiveType type, Object value) { + switch (type.getPrimitiveTypeName()) { + case INT32: recordConsumer.addInteger((int)value); break; + case INT64: recordConsumer.addLong((long)value); break; + case DOUBLE: recordConsumer.addDouble((double)value); break; + case BOOLEAN: recordConsumer.addBoolean((boolean)value); break; + case FLOAT: recordConsumer.addFloat((float)value); break; + case BINARY: + if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.stringType()) { + recordConsumer.addBinary(Binary.fromString((String)value)); + } else { + throw new UnsupportedOperationException("We don't support writing logical annotation type " + type.getLogicalTypeAnnotation()); + } + break; + default: + throw new UnsupportedOperationException("We don't support writing " + type.getPrimitiveTypeName()); + } + + } } } diff --git a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ValueWriter.java b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ValueWriter.java index cf8cce3a..e8cda912 100644 --- a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ValueWriter.java +++ b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ValueWriter.java @@ -1,5 +1,8 @@ package blue.strategic.parquet; +import java.util.List; + public interface ValueWriter { void write(String name, Object value); + void writeList(String name, List value); }