(parquet-floor) Patch in support for writing and reading repeated values

This commit is contained in:
Viktor Lofgren 2023-09-11 14:06:43 +02:00
parent dbe974f510
commit a00cabe223
3 changed files with 52 additions and 23 deletions

View File

@ -20,11 +20,7 @@ import java.io.Closeable;
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.*;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer; import java.util.function.Consumer;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -195,8 +191,11 @@ public final class ParquetReader<U, S> implements Spliterator<S>, Closeable {
U record = hydrator.start(); U record = hydrator.start();
for (ColumnReader columnReader: this.currentRowGroupColumnReaders) { for (ColumnReader columnReader: this.currentRowGroupColumnReaders) {
do {
record = hydrator.add(record, columnReader.getDescriptor().getPath()[0], readValue(columnReader)); record = hydrator.add(record, columnReader.getDescriptor().getPath()[0], readValue(columnReader));
columnReader.consume(); columnReader.consume();
} while (columnReader.getCurrentRepetitionLevel() != 0);
if (columnReader.getCurrentRepetitionLevel() != 0) { if (columnReader.getCurrentRepetitionLevel() != 0) {
throw new IllegalStateException("Unexpected repetition"); throw new IllegalStateException("Unexpected repetition");
} }

View File

@ -18,6 +18,7 @@ import java.io.File;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List;
public final class ParquetWriter<T> implements Closeable { public final class ParquetWriter<T> implements Closeable {
@ -108,7 +109,17 @@ public final class ParquetWriter<T> implements Closeable {
private static class SimpleWriteSupport<T> extends WriteSupport<T> { private static class SimpleWriteSupport<T> extends WriteSupport<T> {
private final MessageType schema; private final MessageType schema;
private final Dehydrator<T> dehydrator; private final Dehydrator<T> dehydrator;
private final ValueWriter valueWriter = SimpleWriteSupport.this::writeField; private final ValueWriter valueWriter = new ValueWriter() {
@Override
public void write(String name, Object value) {
SimpleWriteSupport.this.writeField(name, value);
}
@Override
public void writeList(String name, List<?> value) {
SimpleWriteSupport.this.writeList(name, value);
}
};
private RecordConsumer recordConsumer; private RecordConsumer recordConsumer;
@ -144,6 +155,22 @@ public final class ParquetWriter<T> implements Closeable {
PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType(); PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType();
recordConsumer.startField(name, fieldIndex); recordConsumer.startField(name, fieldIndex);
writeValue(type, value);
recordConsumer.endField(name, fieldIndex);
}
private void writeList(String name, List<?> values) {
int fieldIndex = schema.getFieldIndex(name);
PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType();
recordConsumer.startField(name, fieldIndex);
for (var value : values) {
writeValue(type, value);
}
recordConsumer.endField(name, fieldIndex);
}
void writeValue(PrimitiveType type, Object value) {
switch (type.getPrimitiveTypeName()) { switch (type.getPrimitiveTypeName()) {
case INT32: recordConsumer.addInteger((int)value); break; case INT32: recordConsumer.addInteger((int)value); break;
case INT64: recordConsumer.addLong((long)value); break; case INT64: recordConsumer.addLong((long)value); break;
@ -154,13 +181,13 @@ public final class ParquetWriter<T> implements Closeable {
if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.stringType()) { if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.stringType()) {
recordConsumer.addBinary(Binary.fromString((String)value)); recordConsumer.addBinary(Binary.fromString((String)value));
} else { } else {
throw new UnsupportedOperationException("We don't support writing " + type.getLogicalTypeAnnotation()); throw new UnsupportedOperationException("We don't support writing logical annotation type " + type.getLogicalTypeAnnotation());
} }
break; break;
default: default:
throw new UnsupportedOperationException("We don't support writing " + type.getPrimitiveTypeName()); throw new UnsupportedOperationException("We don't support writing " + type.getPrimitiveTypeName());
} }
recordConsumer.endField(name, fieldIndex);
} }
} }
} }

View File

@ -1,5 +1,8 @@
package blue.strategic.parquet; package blue.strategic.parquet;
import java.util.List;
public interface ValueWriter { public interface ValueWriter {
void write(String name, Object value); void write(String name, Object value);
void writeList(String name, List<?> value);
} }