(parquet-floor) Modify the parquet library to permit list-fields.

This commit is contained in:
Viktor Lofgren 2023-09-13 15:56:35 +02:00
parent 064bc5ee76
commit 9f672a0cf4
4 changed files with 72 additions and 1 deletions

View file

@ -16,6 +16,8 @@ dependencies {
exclude group: 'commons-pool', module: 'commons-pool'
transitive = true
}
implementation libs.trove
}
test {

View file

@ -192,7 +192,10 @@ public final class ParquetReader<U, S> implements Spliterator<S>, Closeable {
U record = hydrator.start();
for (ColumnReader columnReader: this.currentRowGroupColumnReaders) {
do {
record = hydrator.add(record, columnReader.getDescriptor().getPath()[0], readValue(columnReader));
var value = readValue(columnReader);
if (value != null) {
record = hydrator.add(record, columnReader.getDescriptor().getPath()[0], value);
}
columnReader.consume();
} while (columnReader.getCurrentRepetitionLevel() != 0);

View file

@ -1,5 +1,7 @@
package blue.strategic.parquet;
import gnu.trove.list.TIntList;
import gnu.trove.list.TLongList;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.api.WriteSupport;
@ -20,6 +22,9 @@ import java.io.IOException;
import java.util.Collections;
import java.util.List;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
public final class ParquetWriter<T> implements Closeable {
private final org.apache.parquet.hadoop.ParquetWriter<T> writer;
@ -117,6 +122,28 @@ public final class ParquetWriter<T> implements Closeable {
@Override
public void writeList(String name, List<?> value) {
if (value.isEmpty()) {
return;
}
SimpleWriteSupport.this.writeList(name, value);
}
@Override
public void writeList(String name, TLongList value) {
if (value.isEmpty()) {
return;
}
SimpleWriteSupport.this.writeList(name, value);
}
@Override
public void writeList(String name, TIntList value) {
if (value.isEmpty()) {
return;
}
SimpleWriteSupport.this.writeList(name, value);
}
};
@ -170,6 +197,40 @@ public final class ParquetWriter<T> implements Closeable {
recordConsumer.endField(name, fieldIndex);
}
private void writeList(String name, TLongList values) {
int fieldIndex = schema.getFieldIndex(name);
PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType();
recordConsumer.startField(name, fieldIndex);
for (int i = 0; i < values.size(); i++) {
writeValue(type, values.get(i));
}
recordConsumer.endField(name, fieldIndex);
}
private void writeList(String name, TIntList values) {
int fieldIndex = schema.getFieldIndex(name);
PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType();
recordConsumer.startField(name, fieldIndex);
for (int i = 0; i < values.size(); i++) {
writeValue(type, values.get(i));
}
recordConsumer.endField(name, fieldIndex);
}
void writeValue(PrimitiveType type, long value) {
assert type.getPrimitiveTypeName() == INT64;
recordConsumer.addLong(value);
}
void writeValue(PrimitiveType type, int value) {
assert type.getPrimitiveTypeName() == INT32;
recordConsumer.addInteger(value);
}
void writeValue(PrimitiveType type, Object value) {
switch (type.getPrimitiveTypeName()) {
case INT32: recordConsumer.addInteger((int)value); break;

View file

@ -1,8 +1,13 @@
package blue.strategic.parquet;
import gnu.trove.list.TIntList;
import gnu.trove.list.TLongList;
import java.util.List;
public interface ValueWriter {
void write(String name, Object value);
void writeList(String name, List<?> value);
void writeList(String name, TLongList value);
void writeList(String name, TIntList value);
}