diff --git a/README.md b/README.md index ff9c1aa8..58f84e55 100644 --- a/README.md +++ b/README.md @@ -22,9 +22,12 @@ To set up a local test environment, follow the instructions in [📄 run/readme. ## Hardware Requirements -A production-like environment requires at least 128 Gb of RAM and ideally 2 Tb+ of enterprise -grade SSD storage, as well as some additional terabytes of slower harddrives for storing crawl -data. It can be made to run on smaller hardware by limiting size of the index. +A production-like environment requires a lot of RAM and ideally enterprise SSDs for +the index, as well as some additional terabytes of slower harddrives for storing crawl +data. It can be made to run on smaller hardware by limiting size of the index. + +The system will definitely run on a 32 Gb machine, possibly smaller, but at that size it may not perform +very well as it relies on disk caching to be fast. A local developer's deployment is possible with much smaller hardware (and index size). diff --git a/settings.gradle b/settings.gradle index 3e82b3a7..18c86ddd 100644 --- a/settings.gradle +++ b/settings.gradle @@ -83,6 +83,7 @@ include 'third-party:count-min-sketch' include 'third-party:monkey-patch-opennlp' include 'third-party:monkey-patch-gson' include 'third-party:commons-codec' +include 'third-party:parquet-floor' dependencyResolutionManagement { diff --git a/third-party/parquet-floor/build.gradle b/third-party/parquet-floor/build.gradle new file mode 100644 index 00000000..d286c43d --- /dev/null +++ b/third-party/parquet-floor/build.gradle @@ -0,0 +1,20 @@ +plugins { + id 'java' +} + +java { + toolchain { + languageVersion.set(JavaLanguageVersion.of(20)) + } +} + +dependencies { + implementation 'org.apache.parquet:parquet-column:1.13.1' + implementation('org.apache.parquet:parquet-hadoop:1.13.1') { + exclude group: 'commons-pool', module: 'commons-pool' + } +} + +test { + useJUnitPlatform() +} diff --git a/third-party/parquet-floor/readme.md b/third-party/parquet-floor/readme.md new file mode 100644 index 00000000..b1e21c40 --- /dev/null +++ b/third-party/parquet-floor/readme.md @@ -0,0 +1,8 @@ +# Parquet Floor + +License: APL 2.0 + +Git: https://github.com/strategicblue/parquet-floor + +It's basically an adaptor for Parquet I/O without +needing to pull half of Hadoop into your project. diff --git a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/Dehydrator.java b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/Dehydrator.java new file mode 100644 index 00000000..9391b20e --- /dev/null +++ b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/Dehydrator.java @@ -0,0 +1,14 @@ +package blue.strategic.parquet; + + +/** + * Dehydrates a rich java object into a Parquet row. + */ +public interface Dehydrator { + /** + * Write the specified record into the Parquet row using the supplied writer. + * @param record the rich java object + * @param valueWriter facilitates writing to the Parquet row + */ + void dehydrate(T record, ValueWriter valueWriter); +} diff --git a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/Hydrator.java b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/Hydrator.java new file mode 100644 index 00000000..b8410617 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/Hydrator.java @@ -0,0 +1,29 @@ +package blue.strategic.parquet; + +/** + * Creates and hydrates a rich domain object from a Parquet row. + */ +public interface Hydrator { + + /** + * Creates a new mutable instance to be hydrated. + * @return new instance to be hydrated + */ + U start(); + + /** + * Hydrates the target instance by applying the specified value from the Parquet row. + * @param target object being hydrated + * @param heading the name of the column whose value is being applied + * @param value the value to apply + * @return the new target + */ + U add(U target, String heading, Object value); + + /** + * Seals the mutable hydration target. + * @param target object being hydrated + * @return the sealed object + */ + S finish(U target); +} diff --git a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/HydratorSupplier.java b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/HydratorSupplier.java new file mode 100644 index 00000000..fba801c1 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/HydratorSupplier.java @@ -0,0 +1,20 @@ +package blue.strategic.parquet; + +import org.apache.parquet.column.ColumnDescriptor; + +import java.util.List; + +/** + * Supplies hydrdators. + */ +public interface HydratorSupplier { + /** + * Supplies a hydrdator from the specified list of columns. Values will always be added to the hydrator + * in the same order as the columns supplied to this function. + */ + Hydrator get(List columns); + + static HydratorSupplier constantly(final Hydrator hydrator) { + return columns -> hydrator; + } +} diff --git a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java new file mode 100644 index 00000000..4ebcfe60 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetReader.java @@ -0,0 +1,260 @@ +package blue.strategic.parquet; + +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.ColumnReadStore; +import org.apache.parquet.column.ColumnReader; +import org.apache.parquet.column.impl.ColumnReadStoreImpl; +import org.apache.parquet.column.page.PageReadStore; +import org.apache.parquet.example.DummyRecordConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.metadata.FileMetaData; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.io.DelegatingSeekableInputStream; +import org.apache.parquet.io.InputFile; +import org.apache.parquet.io.SeekableInputStream; +import org.apache.parquet.io.api.GroupConverter; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; + +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.Spliterator; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public final class ParquetReader implements Spliterator, Closeable { + private final ParquetFileReader reader; + private final Hydrator hydrator; + private final List columns; + private final MessageType schema; + private final GroupConverter recordConverter; + private final String createdBy; + + private boolean finished; + private long currentRowGroupSize = -1L; + private List currentRowGroupColumnReaders; + private long currentRowIndex = -1L; + + public static Stream streamContent(File file, HydratorSupplier hydrator) throws IOException { + return streamContent(file, hydrator, null); + } + + public static Stream streamContent(File file, HydratorSupplier hydrator, Collection columns) throws IOException { + return streamContent(makeInputFile(file), hydrator, columns); + } + + public static Stream streamContent(InputFile file, HydratorSupplier hydrator) throws IOException { + return streamContent(file, hydrator, null); + } + + public static Stream streamContent(InputFile file, HydratorSupplier hydrator, Collection columns) throws IOException { + return stream(spliterator(file, hydrator, columns)); + } + + public static ParquetReader spliterator(File file, HydratorSupplier hydrator) throws IOException { + return spliterator(file, hydrator, null); + } + + public static ParquetReader spliterator(File file, HydratorSupplier hydrator, Collection columns) throws IOException { + return spliterator(makeInputFile(file), hydrator, columns); + } + + public static ParquetReader spliterator(InputFile file, HydratorSupplier hydrator) throws IOException { + return spliterator(file, hydrator, null); + } + + public static ParquetReader spliterator(InputFile file, HydratorSupplier hydrator, Collection columns) throws IOException { + Set columnSet = (null == columns) ? Collections.emptySet() : Set.copyOf(columns); + return new ParquetReader<>(file, columnSet, hydrator); + } + + public static Stream stream(ParquetReader reader) { + return StreamSupport + .stream(reader, false) + .onClose(() -> closeSilently(reader)); + } + + public static Stream streamContentToStrings(File file) throws IOException { + return stream(spliterator(makeInputFile(file), columns -> { + final AtomicInteger pos = new AtomicInteger(0); + return new Hydrator() { + @Override + public String[] start() { + return new String[columns.size()]; + } + + @Override + public String[] add(String[] target, String heading, Object value) { + target[pos.getAndIncrement()] = heading + "=" + value.toString(); + return target; + } + + @Override + public String[] finish(String[] target) { + return target; + } + }; + }, null)); + } + + public static ParquetMetadata readMetadata(File file) throws IOException { + return readMetadata(makeInputFile(file)); + } + + public static ParquetMetadata readMetadata(InputFile file) throws IOException { + try (ParquetFileReader reader = ParquetFileReader.open(file)) { + return reader.getFooter(); + } + } + + private ParquetReader(InputFile file, Set columnNames, HydratorSupplier hydratorSupplier) throws IOException { + this.reader = ParquetFileReader.open(file); + FileMetaData meta = reader.getFooter().getFileMetaData(); + this.schema = meta.getSchema(); + this.recordConverter = new DummyRecordConverter(this.schema).getRootConverter(); + this.createdBy = meta.getCreatedBy(); + + this.columns = schema.getColumns().stream() + .filter(c -> columnNames.isEmpty() || columnNames.contains(c.getPath()[0])) + .collect(Collectors.toList()); + + this.hydrator = hydratorSupplier.get(this.columns); + } + + private static void closeSilently(Closeable resource) { + try { + resource.close(); + } catch (Exception e) { + // ignore + } + } + + private static Object readValue(ColumnReader columnReader) { + ColumnDescriptor column = columnReader.getDescriptor(); + PrimitiveType primitiveType = column.getPrimitiveType(); + int maxDefinitionLevel = column.getMaxDefinitionLevel(); + + if (columnReader.getCurrentDefinitionLevel() == maxDefinitionLevel) { + switch (primitiveType.getPrimitiveTypeName()) { + case BINARY: + case FIXED_LEN_BYTE_ARRAY: + case INT96: + return primitiveType.stringifier().stringify(columnReader.getBinary()); + case BOOLEAN: + return columnReader.getBoolean(); + case DOUBLE: + return columnReader.getDouble(); + case FLOAT: + return columnReader.getFloat(); + case INT32: + return columnReader.getInteger(); + case INT64: + return columnReader.getLong(); + default: + throw new IllegalArgumentException("Unsupported type: " + primitiveType); + } + } else { + return null; + } + } + + @Override + public void close() throws IOException { + reader.close(); + } + + @Override + public boolean tryAdvance(Consumer action) { + try { + if (this.finished) { + return false; + } + + if (currentRowIndex == currentRowGroupSize) { + PageReadStore rowGroup = reader.readNextRowGroup(); + if (rowGroup == null) { + this.finished = true; + return false; + } + + ColumnReadStore columnReadStore = new ColumnReadStoreImpl(rowGroup, this.recordConverter, this.schema, this.createdBy); + + this.currentRowGroupSize = rowGroup.getRowCount(); + this.currentRowGroupColumnReaders = columns.stream().map(columnReadStore::getColumnReader).collect(Collectors.toList()); + this.currentRowIndex = 0L; + } + + U record = hydrator.start(); + for (ColumnReader columnReader: this.currentRowGroupColumnReaders) { + record = hydrator.add(record, columnReader.getDescriptor().getPath()[0], readValue(columnReader)); + columnReader.consume(); + if (columnReader.getCurrentRepetitionLevel() != 0) { + throw new IllegalStateException("Unexpected repetition"); + } + } + + action.accept(hydrator.finish(record)); + this.currentRowIndex++; + + return true; + } catch (Exception e) { + throw new RuntimeException("Failed to read parquet", e); + } + } + + @Override + public Spliterator trySplit() { + return null; + } + + @Override + public long estimateSize() { + return reader.getRecordCount(); + } + + @Override + public int characteristics() { + return ORDERED | NONNULL | DISTINCT; + } + + public ParquetMetadata metaData() { + return this.reader.getFooter(); + } + + public static InputFile makeInputFile(File file) { + return new InputFile() { + @Override + public long getLength() { + return file.length(); + } + + @Override + public SeekableInputStream newStream() throws IOException { + FileInputStream fis = new FileInputStream(file); + return new DelegatingSeekableInputStream(fis) { + private long position; + + @Override + public long getPos() { + return position; + } + + @Override + public void seek(long newPos) throws IOException { + fis.getChannel().position(newPos); + position = newPos; + } + }; + } + }; + } +} diff --git a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java new file mode 100644 index 00000000..7d75b057 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ParquetWriter.java @@ -0,0 +1,166 @@ +package blue.strategic.parquet; + +import org.apache.hadoop.conf.Configuration; +import org.apache.parquet.column.ParquetProperties; +import org.apache.parquet.hadoop.api.WriteSupport; +import org.apache.parquet.hadoop.metadata.CompressionCodecName; +import org.apache.parquet.io.DelegatingPositionOutputStream; +import org.apache.parquet.io.OutputFile; +import org.apache.parquet.io.PositionOutputStream; +import org.apache.parquet.io.api.Binary; +import org.apache.parquet.io.api.RecordConsumer; +import org.apache.parquet.schema.LogicalTypeAnnotation; +import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.PrimitiveType; + +import java.io.Closeable; +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.util.Collections; + +public final class ParquetWriter implements Closeable { + + private final org.apache.parquet.hadoop.ParquetWriter writer; + + public static ParquetWriter writeFile(MessageType schema, File out, Dehydrator dehydrator) throws IOException { + OutputFile f = new OutputFile() { + @Override + public PositionOutputStream create(long blockSizeHint) throws IOException { + return createOrOverwrite(blockSizeHint); + } + + @Override + public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException { + FileOutputStream fos = new FileOutputStream(out); + return new DelegatingPositionOutputStream(fos) { + @Override + public long getPos() throws IOException { + return fos.getChannel().position(); + } + }; + } + + @Override + public boolean supportsBlockSize() { + return false; + } + + @Override + public long defaultBlockSize() { + return 1024L; + } + }; + return writeOutputFile(schema, f, dehydrator); + } + + private static ParquetWriter writeOutputFile(MessageType schema, OutputFile file, Dehydrator dehydrator) throws IOException { + return new ParquetWriter<>(file, schema, dehydrator); + } + + private ParquetWriter(OutputFile outputFile, MessageType schema, Dehydrator dehydrator) throws IOException { + this.writer = new Builder(outputFile) + .withType(schema) + .withDehydrator(dehydrator) + .withCompressionCodec(CompressionCodecName.SNAPPY) + .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0) + .build(); + } + + public void write(T record) throws IOException { + writer.write(record); + } + + @Override + public void close() throws IOException { + this.writer.close(); + } + + private static final class Builder extends org.apache.parquet.hadoop.ParquetWriter.Builder> { + private MessageType schema; + private Dehydrator dehydrator; + + private Builder(OutputFile file) { + super(file); + } + + public ParquetWriter.Builder withType(MessageType schema) { + this.schema = schema; + return this; + } + + public ParquetWriter.Builder withDehydrator(Dehydrator dehydrator) { + this.dehydrator = dehydrator; + return this; + } + + @Override + protected ParquetWriter.Builder self() { + return this; + } + + @Override + protected WriteSupport getWriteSupport(Configuration conf) { + return new SimpleWriteSupport<>(schema, dehydrator); + } + } + + private static class SimpleWriteSupport extends WriteSupport { + private final MessageType schema; + private final Dehydrator dehydrator; + private final ValueWriter valueWriter = SimpleWriteSupport.this::writeField; + + private RecordConsumer recordConsumer; + + SimpleWriteSupport(MessageType schema, Dehydrator dehydrator) { + this.schema = schema; + this.dehydrator = dehydrator; + } + + @Override + public WriteContext init(Configuration configuration) { + return new WriteContext(schema, Collections.emptyMap()); + } + + @Override + public void prepareForWrite(RecordConsumer recordConsumer) { + this.recordConsumer = recordConsumer; + } + + @Override + public void write(T record) { + recordConsumer.startMessage(); + dehydrator.dehydrate(record, valueWriter); + recordConsumer.endMessage(); + } + + @Override + public String getName() { + return "blue.strategic.parquet.ParquetWriter"; + } + + private void writeField(String name, Object value) { + int fieldIndex = schema.getFieldIndex(name); + PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType(); + recordConsumer.startField(name, fieldIndex); + + switch (type.getPrimitiveTypeName()) { + case INT32: recordConsumer.addInteger((int)value); break; + case INT64: recordConsumer.addLong((long)value); break; + case DOUBLE: recordConsumer.addDouble((double)value); break; + case BOOLEAN: recordConsumer.addBoolean((boolean)value); break; + case FLOAT: recordConsumer.addFloat((float)value); break; + case BINARY: + if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.stringType()) { + recordConsumer.addBinary(Binary.fromString((String)value)); + } else { + throw new UnsupportedOperationException("We don't support writing " + type.getLogicalTypeAnnotation()); + } + break; + default: + throw new UnsupportedOperationException("We don't support writing " + type.getPrimitiveTypeName()); + } + recordConsumer.endField(name, fieldIndex); + } + } +} diff --git a/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ValueWriter.java b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ValueWriter.java new file mode 100644 index 00000000..cf8cce3a --- /dev/null +++ b/third-party/parquet-floor/src/main/java/blue/strategic/parquet/ValueWriter.java @@ -0,0 +1,5 @@ +package blue.strategic.parquet; + +public interface ValueWriter { + void write(String name, Object value); +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/conf/Configurable.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/conf/Configurable.java new file mode 100644 index 00000000..f7ca25f6 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/conf/Configurable.java @@ -0,0 +1,5 @@ +package org.apache.hadoop.conf; + +public interface Configurable { + void setConf(Configuration conf); +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/conf/Configuration.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/conf/Configuration.java new file mode 100644 index 00000000..a9c3231d --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/conf/Configuration.java @@ -0,0 +1,19 @@ +package org.apache.hadoop.conf; + +public class Configuration { + + public boolean getBoolean(String x, boolean y) { + return y; + } + + public void setBoolean(String x, boolean y) { + } + + public int getInt(String x, int y) { + return y; + } + + public String get(String x) { + return null; + } +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java new file mode 100644 index 00000000..f51a64e5 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -0,0 +1,51 @@ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.io.RandomAccessFile; + +public class FSDataInputStream extends InputStream { + private final RandomAccessFile input; + + public FSDataInputStream(org.apache.hadoop.fs.Path p) throws FileNotFoundException { + this.input = new RandomAccessFile(p.file(), "r"); + } + + @Override + public int read() throws IOException { + return input.read(); + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + try { + input.readFully(buf, off, len); + return len; + } catch (IOException e) { + e.printStackTrace(); + return -1; + } + } + + public void seek(long pos) { + try { + input.seek(pos); + } catch (IOException e) { + e.printStackTrace(); + } + } + + public void readFully(byte[] buf, int a, int b) { + try { + input.readFully(buf, a, b); + } catch (IOException e) { + e.printStackTrace(); + } + } + + @Override + public void close() throws IOException { + input.close(); + } +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java new file mode 100644 index 00000000..fb065899 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java @@ -0,0 +1,28 @@ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.io.RandomAccessFile; + +public class FSDataOutputStream extends OutputStream { + private final RandomAccessFile output; + + public FSDataOutputStream(org.apache.hadoop.fs.Path p) throws FileNotFoundException { + this.output = new RandomAccessFile(p.file(), "rw"); + } + + @Override + public void write(int b) throws IOException { + this.output.write(b); + } + + @Override + public void close() throws IOException { + output.close(); + } + + public long getPos() throws IOException { + return this.output.getFilePointer(); + } +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/FileStatus.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/FileStatus.java new file mode 100644 index 00000000..4ba53fcb --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/FileStatus.java @@ -0,0 +1,21 @@ +package org.apache.hadoop.fs; + +public class FileStatus { + private final org.apache.hadoop.fs.Path path; + + public FileStatus(org.apache.hadoop.fs.Path p) { + path = p; + } + + public boolean isFile() { + return true; + } + + public org.apache.hadoop.fs.Path getPath() { + return path; + } + + public long getLen() { + return path.file().length(); + } +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/FileSystem.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/FileSystem.java new file mode 100644 index 00000000..c725b460 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/FileSystem.java @@ -0,0 +1,51 @@ +package org.apache.hadoop.fs; + +import java.io.FileNotFoundException; +import java.net.URI; +import java.net.URISyntaxException; + +public class FileSystem { + + public FileStatus getFileStatus(org.apache.hadoop.fs.Path p) { + return new FileStatus(p); + } + + public org.apache.hadoop.fs.Path makeQualified(org.apache.hadoop.fs.Path p) { + return p; + } + + public URI getUri() { + try { + return new URI("http://localhost/"); + } catch (URISyntaxException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public short getDefaultReplication(org.apache.hadoop.fs.Path p) { + return 0; + } + + public long getDefaultBlockSize(org.apache.hadoop.fs.Path p) { + return 1024; + } + + public FSDataInputStream open(org.apache.hadoop.fs.Path p) { + try { + return new FSDataInputStream(p); + } catch (FileNotFoundException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } + + public org.apache.hadoop.fs.FSDataOutputStream create(org.apache.hadoop.fs.Path p, boolean a, int b, short c, long d) { + try { + return new FSDataOutputStream(p); + } catch (FileNotFoundException e) { + e.printStackTrace(); + throw new RuntimeException(e); + } + } +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/Path.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/Path.java new file mode 100644 index 00000000..e2392459 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/Path.java @@ -0,0 +1,22 @@ +package org.apache.hadoop.fs; + +import org.apache.hadoop.conf.Configuration; + +import java.io.File; + +public class Path { + + private final File file; + + public Path(String path) { + file = new File(path); + } + + public FileSystem getFileSystem(Configuration conf) { + return new FileSystem(); + } + + public File file() { + return file; + } +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/PathFilter.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/PathFilter.java new file mode 100644 index 00000000..90ab8b39 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/PathFilter.java @@ -0,0 +1,4 @@ +package org.apache.hadoop.fs; + +public interface PathFilter { +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/PositionedReadable.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/PositionedReadable.java new file mode 100644 index 00000000..6ac1b55d --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/PositionedReadable.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.EOFException; +import java.io.IOException; + +/** + * Stream that permits positional reading. + * + * Implementations are required to implement thread-safe operations; this may + * be supported by concurrent access to the data, or by using a synchronization + * mechanism to serialize access. + * + * Not all implementations meet this requirement. Those that do not cannot + * be used as a backing store for some applications, such as Apache HBase. + * + * Independent of whether or not they are thread safe, some implementations + * may make the intermediate state of the system, specifically the position + * obtained in {@code Seekable.getPos()} visible. + */ +public interface PositionedReadable { + /** + * Read up to the specified number of bytes, from a given + * position within a file, and return the number of bytes read. This does not + * change the current offset of a file, and is thread-safe. + * + * Warning: Not all filesystems satisfy the thread-safety requirement. + * @param position position within file + * @param buffer destination buffer + * @param offset offset in the buffer + * @param length number of bytes to read + * @return actual number of bytes read; -1 means "none" + * @throws IOException IO problems. + */ + int read(long position, byte[] buffer, int offset, int length) + throws IOException; + + /** + * Read the specified number of bytes, from a given + * position within a file. This does not + * change the current offset of a file, and is thread-safe. + * + * Warning: Not all filesystems satisfy the thread-safety requirement. + * @param position position within file + * @param buffer destination buffer + * @param offset offset in the buffer + * @param length number of bytes to read + * @throws IOException IO problems. + * @throws EOFException the end of the data was reached before + * the read operation completed + */ + void readFully(long position, byte[] buffer, int offset, int length) + throws IOException; + + /** + * Read number of bytes equal to the length of the buffer, from a given + * position within a file. This does not + * change the current offset of a file, and is thread-safe. + * + * Warning: Not all filesystems satisfy the thread-safety requirement. + * @param position position within file + * @param buffer destination buffer + * @throws IOException IO problems. + * @throws EOFException the end of the data was reached before + * the read operation completed + */ + void readFully(long position, byte[] buffer) throws IOException; +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/Seekable.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/Seekable.java new file mode 100644 index 00000000..66a8d3dd --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/fs/Seekable.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; + +/** + * Stream that permits seeking. + */ +public interface Seekable { + /** + * Seek to the given offset from the start of the file. + * The next read() will be from that location. Can't + * seek past the end of the file. + */ + void seek(long pos) throws IOException; + + /** + * Return the current offset from the start of the file + */ + long getPos() throws IOException; + + /** + * Seeks a different copy of the data. Returns true if + * found a new source, false otherwise. + */ + boolean seekToNewSource(long targetPos) throws IOException; +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CodecPool.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CodecPool.java new file mode 100644 index 00000000..3e6873c2 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CodecPool.java @@ -0,0 +1,21 @@ +package org.apache.hadoop.io.compress; + +public final class CodecPool { + + private CodecPool() { /* prevent instantiation */ } + public static Decompressor getDecompressor(CompressionCodec codec) { + return codec.createDecompressor(); + } + + public static void returnDecompressor(Decompressor decompressor) { + + } + + public static Compressor getCompressor(CompressionCodec codec) { + return codec.createCompressor(); + } + + public static void returnCompressor(Compressor compressor) { + + } +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java new file mode 100644 index 00000000..3d5263e0 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CompressionCodec.java @@ -0,0 +1,11 @@ +package org.apache.hadoop.io.compress; + +import java.io.InputStream; +import java.io.OutputStream; + +public interface CompressionCodec { + Decompressor createDecompressor(); + Compressor createCompressor(); + CompressionInputStream createInputStream(InputStream is, Decompressor d); + CompressionOutputStream createOutputStream(OutputStream os, Compressor c); +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java new file mode 100644 index 00000000..2539b301 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CompressionInputStream.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.InputStream; + +import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; +/** + * A compression input stream. + * + *

Implementations are assumed to be buffered. This permits clients to + * reposition the underlying input stream then call {@link #resetState()}, + * without having to also synchronize client buffers. + */ +public abstract class CompressionInputStream extends InputStream implements Seekable { + /** + * The input stream to be compressed. + */ + protected final InputStream in; + protected long maxAvailableData; + + private Decompressor trackedDecompressor; + + /** + * Create a compression input stream that reads + * the decompressed bytes from the given stream. + * + * @param in The input stream to be compressed. + * @throws IOException + */ + protected CompressionInputStream(InputStream in) throws IOException { + if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) { + this.maxAvailableData = in.available(); + } + this.in = in; + } + + @Override + public void close() throws IOException { + try { + in.close(); + } finally { + if (trackedDecompressor != null) { + CodecPool.returnDecompressor(trackedDecompressor); + trackedDecompressor = null; + } + } + } + + /** + * Read bytes from the stream. + * Made abstract to prevent leakage to underlying stream. + */ + @Override + public abstract int read(byte[] b, int off, int len) throws IOException; + + /** + * Reset the decompressor to its initial state and discard any buffered data, + * as the underlying stream may have been repositioned. + */ + public abstract void resetState() throws IOException; + + /** + * This method returns the current position in the stream. + * + * @return Current position in stream as a long + */ + @Override + public long getPos() throws IOException { + if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) { + //This way of getting the current position will not work for file + //size which can be fit in an int and hence can not be returned by + //available method. + return this.maxAvailableData - this.in.available(); + } else { + return ((Seekable)this.in).getPos(); + } + + } + + /** + * This method is current not supported. + * + * @throws UnsupportedOperationException + */ + + @Override + public void seek(long pos) throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + + /** + * This method is current not supported. + * + * @throws UnsupportedOperationException + */ + @Override + public boolean seekToNewSource(long targetPos) throws UnsupportedOperationException { + throw new UnsupportedOperationException(); + } + + void setTrackedDecompressor(Decompressor decompressor) { + trackedDecompressor = decompressor; + } +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java new file mode 100644 index 00000000..b0ac3482 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CompressionOutputStream.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * A compression output stream. + */ +public abstract class CompressionOutputStream extends OutputStream { + /** + * The output stream to be compressed. + */ + protected final OutputStream out; + + /** + * If non-null, this is the Compressor object that we should call + * CodecPool#returnCompressor on when this stream is closed. + */ + private Compressor trackedCompressor; + + /** + * Create a compression output stream that writes + * the compressed bytes to the given stream. + * @param out + */ + protected CompressionOutputStream(OutputStream out) { + this.out = out; + } + + void setTrackedCompressor(Compressor compressor) { + trackedCompressor = compressor; + } + + @Override + public void close() throws IOException { + try { + finish(); + } finally { + try { + out.close(); + } finally { + if (trackedCompressor != null) { + CodecPool.returnCompressor(trackedCompressor); + trackedCompressor = null; + } + } + } + } + + @Override + public void flush() throws IOException { + out.flush(); + } + + /** + * Write compressed bytes to the stream. + * Made abstract to prevent leakage to underlying stream. + */ + @Override + public abstract void write(byte[] b, int off, int len) throws IOException; + + /** + * Finishes writing compressed data to the output stream + * without closing the underlying stream. + */ + public abstract void finish() throws IOException; + + /** + * Reset the compression to the initial state. + * Does not reset the underlying stream. + */ + public abstract void resetState() throws IOException; + +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/Compressor.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/Compressor.java new file mode 100644 index 00000000..3aa4e002 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/Compressor.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +/** + * Specification of a stream-based 'compressor' which can be + * plugged into a {@link CompressionOutputStream} to compress data. + * This is modelled after {@link java.util.zip.Deflater} + * + */ +public interface Compressor { + /** + * Sets input data for compression. + * This should be called whenever #needsInput() returns + * true indicating that more input data is required. + * + * @param b Input data + * @param off Start offset + * @param len Length + */ + void setInput(byte[] b, int off, int len); + + /** + * Returns true if the input data buffer is empty and + * #setInput() should be called to provide more input. + * + * @return true if the input data buffer is empty and + * #setInput() should be called in order to provide more input. + */ + boolean needsInput(); + + /** + * Sets preset dictionary for compression. A preset dictionary + * is used when the history buffer can be predetermined. + * + * @param b Dictionary data bytes + * @param off Start offset + * @param len Length + */ + void setDictionary(byte[] b, int off, int len); + + /** + * Return number of uncompressed bytes input so far. + */ + long getBytesRead(); + + /** + * Return number of compressed bytes output so far. + */ + long getBytesWritten(); + + /** + * When called, indicates that compression should end + * with the current contents of the input buffer. + */ + void finish(); + + /** + * Returns true if the end of the compressed + * data output stream has been reached. + * @return true if the end of the compressed + * data output stream has been reached. + */ + boolean finished(); + + /** + * Fills specified buffer with compressed data. Returns actual number + * of bytes of compressed data. A return value of 0 indicates that + * needsInput() should be called in order to determine if more input + * data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + */ + int compress(byte[] b, int off, int len) throws IOException; + + /** + * Resets compressor so that a new set of input data can be processed. + */ + void reset(); + + /** + * Closes the compressor and discards any unprocessed input. + */ + void end(); + + /** + * Prepare the compressor to be used in a new stream with settings defined in + * the given Configuration + * + * @param conf Configuration from which new setting are fetched + */ + void reinit(Configuration conf); +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java new file mode 100644 index 00000000..f1f066bb --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/CompressorStream.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.OutputStream; + +public class CompressorStream extends CompressionOutputStream { + protected Compressor compressor; + protected byte[] buffer; + protected boolean closed; + + public CompressorStream(OutputStream out, Compressor compressor, int bufferSize) { + super(out); + + if (out == null || compressor == null) { + throw new NullPointerException(); + } else if (bufferSize <= 0) { + throw new IllegalArgumentException("Illegal bufferSize"); + } + + this.compressor = compressor; + buffer = new byte[bufferSize]; + } + + public CompressorStream(OutputStream out, Compressor compressor) { + this(out, compressor, 512); + } + + /** + * Allow derived classes to directly set the underlying stream. + * + * @param out Underlying output stream. + */ + protected CompressorStream(OutputStream out) { + super(out); + } + + @Override + public void write(byte[] b, int off, int len) throws IOException { + // Sanity checks + if (compressor.finished()) { + throw new IOException("write beyond end of stream"); + } + if ((off | len | (off + len) | (b.length - (off + len))) < 0) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return; + } + + compressor.setInput(b, off, len); + while (!compressor.needsInput()) { + compress(); + } + } + + protected void compress() throws IOException { + int len = compressor.compress(buffer, 0, buffer.length); + if (len > 0) { + out.write(buffer, 0, len); + } + } + + @Override + public void finish() throws IOException { + if (!compressor.finished()) { + compressor.finish(); + while (!compressor.finished()) { + compress(); + } + } + } + + @Override + public void resetState() throws IOException { + compressor.reset(); + } + + @Override + public void close() throws IOException { + if (!closed) { + try { + super.close(); + } finally { + closed = true; + } + } + } + + private byte[] oneByte = new byte[1]; + @Override + public void write(int b) throws IOException { + oneByte[0] = (byte)(b & 0xff); + write(oneByte, 0, oneByte.length); + } + +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/Decompressor.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/Decompressor.java new file mode 100644 index 00000000..d5799037 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/Decompressor.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; + + +/** + * Specification of a stream-based 'de-compressor' which can be + * plugged into a {@link CompressionInputStream} to compress data. + * This is modelled after {@link java.util.zip.Inflater} + * + */ +public interface Decompressor { + /** + * Sets input data for decompression. + * This should be called if and only if {@link #needsInput()} returns + * true indicating that more input data is required. + * (Both native and non-native versions of various Decompressors require + * that the data passed in via b[] remain unmodified until + * the caller is explicitly notified--via {@link #needsInput()}--that the + * buffer may be safely modified. With this requirement, an extra + * buffer-copy can be avoided.) + * + * @param b Input data + * @param off Start offset + * @param len Length + */ + void setInput(byte[] b, int off, int len); + + /** + * Returns true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called to + * provide more input. + * + * @return true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called in + * order to provide more input. + */ + boolean needsInput(); + + /** + * Sets preset dictionary for compression. A preset dictionary + * is used when the history buffer can be predetermined. + * + * @param b Dictionary data bytes + * @param off Start offset + * @param len Length + */ + void setDictionary(byte[] b, int off, int len); + + /** + * Returns true if a preset dictionary is needed for decompression. + * @return true if a preset dictionary is needed for decompression + */ + boolean needsDictionary(); + + /** + * Returns true if the end of the decompressed + * data output stream has been reached. Indicates a concatenated data stream + * when finished() returns true and {@link #getRemaining()} + * returns a positive value. finished() will be reset with the + * {@link #reset()} method. + * @return true if the end of the decompressed + * data output stream has been reached. + */ + boolean finished(); + + /** + * Fills specified buffer with uncompressed data. Returns actual number + * of bytes of uncompressed data. A return value of 0 indicates that + * {@link #needsInput()} should be called in order to determine if more + * input data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of uncompressed data. + * @throws IOException + */ + int decompress(byte[] b, int off, int len) throws IOException; + + /** + * Returns the number of bytes remaining in the compressed data buffer. + * Indicates a concatenated data stream if {@link #finished()} returns + * true and getRemaining() returns a positive value. If + * {@link #finished()} returns true and getRemaining() returns + * a zero value, indicates that the end of data stream has been reached and + * is not a concatenated data stream. + * @return The number of bytes remaining in the compressed data buffer. + */ + int getRemaining(); + + /** + * Resets decompressor and input and output buffers so that a new set of + * input data can be processed. If {@link #finished()}} returns + * true and {@link #getRemaining()} returns a positive value, + * reset() is called before processing of the next data stream in the + * concatenated data stream. {@link #finished()} will be reset and will + * return false when reset() is called. + */ + void reset(); + + /** + * Closes the decompressor and discards any unprocessed input. + */ + void end(); +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java new file mode 100644 index 00000000..a516d4b2 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/io/compress/DecompressorStream.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; + +public class DecompressorStream extends CompressionInputStream { + /** + * The maximum input buffer size. + */ + private static final int MAX_INPUT_BUFFER_SIZE = 512; + /** + * MAX_SKIP_BUFFER_SIZE is used to determine the maximum buffer size to + * use when skipping. See {@link java.io.InputStream}. + */ + private static final int MAX_SKIP_BUFFER_SIZE = 2048; + + private byte[] skipBytes; + private byte[] oneByte = new byte[1]; + + protected Decompressor decompressor; + protected byte[] buffer; + protected boolean eof; + protected boolean closed; + private int lastBytesSent; + + DecompressorStream(InputStream in, Decompressor decompressor, + int bufferSize, int skipBufferSize) + throws IOException { + super(in); + + if (decompressor == null) { + throw new NullPointerException(); + } else if (bufferSize <= 0) { + throw new IllegalArgumentException("Illegal bufferSize"); + } + + this.decompressor = decompressor; + buffer = new byte[bufferSize]; + skipBytes = new byte[skipBufferSize]; + } + + public DecompressorStream(InputStream in, Decompressor decompressor, + int bufferSize) + throws IOException { + this(in, decompressor, bufferSize, MAX_SKIP_BUFFER_SIZE); + } + + public DecompressorStream(InputStream in, Decompressor decompressor) + throws IOException { + this(in, decompressor, MAX_INPUT_BUFFER_SIZE); + } + + /** + * Allow derived classes to directly set the underlying stream. + * + * @param in Underlying input stream. + * @throws IOException + */ + protected DecompressorStream(InputStream in) throws IOException { + super(in); + } + + @Override + public int read() throws IOException { + checkStream(); + return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + checkStream(); + + if ((off | len | (off + len) | (b.length - (off + len))) < 0) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + return decompress(b, off, len); + } + + protected int decompress(byte[] b, int off, int len) throws IOException { + int n; + + while ((n = decompressor.decompress(b, off, len)) == 0) { + if (decompressor.needsDictionary()) { + eof = true; + return -1; + } + + if (decompressor.finished()) { + // First see if there was any leftover buffered input from previous + // stream; if not, attempt to refill buffer. If refill -> EOF, we're + // all done; else reset, fix up input buffer, and get ready for next + // concatenated substream/"member". + int nRemaining = decompressor.getRemaining(); + if (nRemaining == 0) { + int m = getCompressedData(); + if (m == -1) { + // apparently the previous end-of-stream was also end-of-file: + // return success, as if we had never called getCompressedData() + eof = true; + return -1; + } + decompressor.reset(); + decompressor.setInput(buffer, 0, m); + lastBytesSent = m; + } else { + // looks like it's a concatenated stream: reset low-level zlib (or + // other engine) and buffers, then "resend" remaining input data + decompressor.reset(); + int leftoverOffset = lastBytesSent - nRemaining; + assert leftoverOffset >= 0; + // this recopies userBuf -> direct buffer if using native libraries: + decompressor.setInput(buffer, leftoverOffset, nRemaining); + // NOTE: this is the one place we do NOT want to save the number + // of bytes sent (nRemaining here) into lastBytesSent: since we + // are resending what we've already sent before, offset is nonzero + // in general (only way it could be zero is if it already equals + // nRemaining), which would then screw up the offset calculation + // _next_ time around. IOW, getRemaining() is in terms of the + // original, zero-offset bufferload, so lastBytesSent must be as + // well. Cheesy ASCII art: + // + // <------------ m, lastBytesSent -----------> + // +===============================================+ + // buffer: |1111111111|22222222222222222|333333333333| | + // +===============================================+ + // #1: <-- off -->|<-------- nRemaining ---------> + // #2: <----------- off ----------->|<-- nRem. --> + // #3: (final substream: nRemaining == 0; eof = true) + // + // If lastBytesSent is anything other than m, as shown, then "off" + // will be calculated incorrectly. + } + } else if (decompressor.needsInput()) { + int m = getCompressedData(); + if (m == -1) { + throw new EOFException("Unexpected end of input stream"); + } + decompressor.setInput(buffer, 0, m); + lastBytesSent = m; + } + } + + return n; + } + + protected int getCompressedData() throws IOException { + checkStream(); + + // note that the _caller_ is now required to call setInput() or throw + return in.read(buffer, 0, buffer.length); + } + + protected void checkStream() throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + } + + @Override + public void resetState() throws IOException { + decompressor.reset(); + } + + @Override + public long skip(long n) throws IOException { + // Sanity checks + if (n < 0) { + throw new IllegalArgumentException("negative skip length"); + } + checkStream(); + + // Read 'n' bytes + int skipped = 0; + while (skipped < n) { + int len = Math.min((int)n - skipped, skipBytes.length); + len = read(skipBytes, 0, len); + if (len == -1) { + eof = true; + break; + } + skipped += len; + } + return skipped; + } + + @Override + public int available() throws IOException { + checkStream(); + return eof ? 0 : 1; + } + + @Override + public void close() throws IOException { + if (!closed) { + try { + super.close(); + } finally { + closed = true; + } + } + } + + @Override + public boolean markSupported() { + return false; + } + + @Override + public synchronized void mark(int readlimit) { + } + + @Override + public synchronized void reset() throws IOException { + throw new IOException("mark/reset not supported"); + } + +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/Job.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/Job.java new file mode 100644 index 00000000..f262d0dd --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/Job.java @@ -0,0 +1,4 @@ +package org.apache.hadoop.mapreduce; + +public class Job extends JobContext { +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/JobContext.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/JobContext.java new file mode 100644 index 00000000..e781120e --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/JobContext.java @@ -0,0 +1,4 @@ +package org.apache.hadoop.mapreduce; + +public class JobContext { +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java new file mode 100644 index 00000000..6cf0bd24 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java @@ -0,0 +1,4 @@ +package org.apache.hadoop.mapreduce; + +public interface OutputCommitter { +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/RecordReader.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/RecordReader.java new file mode 100644 index 00000000..92e3474c --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/RecordReader.java @@ -0,0 +1,4 @@ +package org.apache.hadoop.mapreduce; + +public class RecordReader { +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/RecordWriter.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/RecordWriter.java new file mode 100644 index 00000000..a578a73d --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/RecordWriter.java @@ -0,0 +1,4 @@ +package org.apache.hadoop.mapreduce; + +public class RecordWriter { +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java new file mode 100644 index 00000000..e5e80827 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/TaskAttemptContext.java @@ -0,0 +1,4 @@ +package org.apache.hadoop.mapreduce; + +public class TaskAttemptContext extends JobContext { +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java new file mode 100644 index 00000000..62e748d5 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java @@ -0,0 +1,4 @@ +package org.apache.hadoop.mapreduce.lib.input; + +public class FileInputFormat { +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java new file mode 100644 index 00000000..aa66b4f0 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -0,0 +1,6 @@ +package org.apache.hadoop.mapreduce.lib.output; + +import org.apache.hadoop.mapreduce.OutputCommitter; + +public class FileOutputCommitter implements OutputCommitter { +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java new file mode 100644 index 00000000..daa34d02 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputFormat.java @@ -0,0 +1,4 @@ +package org.apache.hadoop.mapreduce.lib.output; + +public class FileOutputFormat { +} diff --git a/third-party/parquet-floor/src/main/java/org/apache/hadoop/util/ReflectionUtils.java b/third-party/parquet-floor/src/main/java/org/apache/hadoop/util/ReflectionUtils.java new file mode 100644 index 00000000..d4f0c9f2 --- /dev/null +++ b/third-party/parquet-floor/src/main/java/org/apache/hadoop/util/ReflectionUtils.java @@ -0,0 +1,22 @@ +package org.apache.hadoop.util; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; + +public final class ReflectionUtils { + + private ReflectionUtils() { /* prevent instantitation */ } + + public static Object newInstance(Class type, Configuration x) { + try { + Object o = type.newInstance(); + if (o instanceof Configurable) { + ((Configurable) o).setConf(x); + } + return o; + } catch (Exception e) { + e.printStackTrace(); + return null; + } + } +}