(parquet) Add parquet library

This small library, while great, will require some modifications to fit the project's needs, so it goes into third-party directly.
This commit is contained in:
Viktor Lofgren 2023-09-05 10:38:51 +02:00
parent 07d7507ac6
commit a284682deb
38 changed files with 1756 additions and 3 deletions

View File

@ -22,9 +22,12 @@ To set up a local test environment, follow the instructions in [📄 run/readme.
## Hardware Requirements
A production-like environment requires at least 128 Gb of RAM and ideally 2 Tb+ of enterprise
grade SSD storage, as well as some additional terabytes of slower harddrives for storing crawl
data. It can be made to run on smaller hardware by limiting size of the index.
A production-like environment requires a lot of RAM and ideally enterprise SSDs for
the index, as well as some additional terabytes of slower harddrives for storing crawl
data. It can be made to run on smaller hardware by limiting size of the index.
The system will definitely run on a 32 Gb machine, possibly smaller, but at that size it may not perform
very well as it relies on disk caching to be fast.
A local developer's deployment is possible with much smaller hardware (and index size).

View File

@ -83,6 +83,7 @@ include 'third-party:count-min-sketch'
include 'third-party:monkey-patch-opennlp'
include 'third-party:monkey-patch-gson'
include 'third-party:commons-codec'
include 'third-party:parquet-floor'
dependencyResolutionManagement {

20
third-party/parquet-floor/build.gradle vendored Normal file
View File

@ -0,0 +1,20 @@
plugins {
id 'java'
}
java {
toolchain {
languageVersion.set(JavaLanguageVersion.of(20))
}
}
dependencies {
implementation 'org.apache.parquet:parquet-column:1.13.1'
implementation('org.apache.parquet:parquet-hadoop:1.13.1') {
exclude group: 'commons-pool', module: 'commons-pool'
}
}
test {
useJUnitPlatform()
}

8
third-party/parquet-floor/readme.md vendored Normal file
View File

@ -0,0 +1,8 @@
# Parquet Floor
License: APL 2.0
Git: https://github.com/strategicblue/parquet-floor
It's basically an adaptor for Parquet I/O without
needing to pull half of Hadoop into your project.

View File

@ -0,0 +1,14 @@
package blue.strategic.parquet;
/**
* Dehydrates a rich java object into a Parquet row.
*/
public interface Dehydrator<T> {
/**
* Write the specified record into the Parquet row using the supplied writer.
* @param record the rich java object
* @param valueWriter facilitates writing to the Parquet row
*/
void dehydrate(T record, ValueWriter valueWriter);
}

View File

@ -0,0 +1,29 @@
package blue.strategic.parquet;
/**
* Creates and hydrates a rich domain object from a Parquet row.
*/
public interface Hydrator<U, S> {
/**
* Creates a new mutable instance to be hydrated.
* @return new instance to be hydrated
*/
U start();
/**
* Hydrates the target instance by applying the specified value from the Parquet row.
* @param target object being hydrated
* @param heading the name of the column whose value is being applied
* @param value the value to apply
* @return the new target
*/
U add(U target, String heading, Object value);
/**
* Seals the mutable hydration target.
* @param target object being hydrated
* @return the sealed object
*/
S finish(U target);
}

View File

@ -0,0 +1,20 @@
package blue.strategic.parquet;
import org.apache.parquet.column.ColumnDescriptor;
import java.util.List;
/**
* Supplies hydrdators.
*/
public interface HydratorSupplier<U, S> {
/**
* Supplies a hydrdator from the specified list of columns. Values will always be added to the hydrator
* in the same order as the columns supplied to this function.
*/
Hydrator<U, S> get(List<ColumnDescriptor> columns);
static <A, B> HydratorSupplier<A, B> constantly(final Hydrator<A, B> hydrator) {
return columns -> hydrator;
}
}

View File

@ -0,0 +1,260 @@
package blue.strategic.parquet;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnReadStore;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.example.DummyRecordConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.io.DelegatingSeekableInputStream;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import java.io.Closeable;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.Spliterator;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public final class ParquetReader<U, S> implements Spliterator<S>, Closeable {
private final ParquetFileReader reader;
private final Hydrator<U, S> hydrator;
private final List<ColumnDescriptor> columns;
private final MessageType schema;
private final GroupConverter recordConverter;
private final String createdBy;
private boolean finished;
private long currentRowGroupSize = -1L;
private List<ColumnReader> currentRowGroupColumnReaders;
private long currentRowIndex = -1L;
public static <U, S> Stream<S> streamContent(File file, HydratorSupplier<U, S> hydrator) throws IOException {
return streamContent(file, hydrator, null);
}
public static <U, S> Stream<S> streamContent(File file, HydratorSupplier<U, S> hydrator, Collection<String> columns) throws IOException {
return streamContent(makeInputFile(file), hydrator, columns);
}
public static <U, S> Stream<S> streamContent(InputFile file, HydratorSupplier<U, S> hydrator) throws IOException {
return streamContent(file, hydrator, null);
}
public static <U, S> Stream<S> streamContent(InputFile file, HydratorSupplier<U, S> hydrator, Collection<String> columns) throws IOException {
return stream(spliterator(file, hydrator, columns));
}
public static <U, S> ParquetReader<U, S> spliterator(File file, HydratorSupplier<U, S> hydrator) throws IOException {
return spliterator(file, hydrator, null);
}
public static <U, S> ParquetReader<U, S> spliterator(File file, HydratorSupplier<U, S> hydrator, Collection<String> columns) throws IOException {
return spliterator(makeInputFile(file), hydrator, columns);
}
public static <U, S> ParquetReader<U, S> spliterator(InputFile file, HydratorSupplier<U, S> hydrator) throws IOException {
return spliterator(file, hydrator, null);
}
public static <U, S> ParquetReader<U, S> spliterator(InputFile file, HydratorSupplier<U, S> hydrator, Collection<String> columns) throws IOException {
Set<String> columnSet = (null == columns) ? Collections.emptySet() : Set.copyOf(columns);
return new ParquetReader<>(file, columnSet, hydrator);
}
public static <U, S> Stream<S> stream(ParquetReader<U, S> reader) {
return StreamSupport
.stream(reader, false)
.onClose(() -> closeSilently(reader));
}
public static Stream<String[]> streamContentToStrings(File file) throws IOException {
return stream(spliterator(makeInputFile(file), columns -> {
final AtomicInteger pos = new AtomicInteger(0);
return new Hydrator<String[], String[]>() {
@Override
public String[] start() {
return new String[columns.size()];
}
@Override
public String[] add(String[] target, String heading, Object value) {
target[pos.getAndIncrement()] = heading + "=" + value.toString();
return target;
}
@Override
public String[] finish(String[] target) {
return target;
}
};
}, null));
}
public static ParquetMetadata readMetadata(File file) throws IOException {
return readMetadata(makeInputFile(file));
}
public static ParquetMetadata readMetadata(InputFile file) throws IOException {
try (ParquetFileReader reader = ParquetFileReader.open(file)) {
return reader.getFooter();
}
}
private ParquetReader(InputFile file, Set<String> columnNames, HydratorSupplier<U, S> hydratorSupplier) throws IOException {
this.reader = ParquetFileReader.open(file);
FileMetaData meta = reader.getFooter().getFileMetaData();
this.schema = meta.getSchema();
this.recordConverter = new DummyRecordConverter(this.schema).getRootConverter();
this.createdBy = meta.getCreatedBy();
this.columns = schema.getColumns().stream()
.filter(c -> columnNames.isEmpty() || columnNames.contains(c.getPath()[0]))
.collect(Collectors.toList());
this.hydrator = hydratorSupplier.get(this.columns);
}
private static void closeSilently(Closeable resource) {
try {
resource.close();
} catch (Exception e) {
// ignore
}
}
private static Object readValue(ColumnReader columnReader) {
ColumnDescriptor column = columnReader.getDescriptor();
PrimitiveType primitiveType = column.getPrimitiveType();
int maxDefinitionLevel = column.getMaxDefinitionLevel();
if (columnReader.getCurrentDefinitionLevel() == maxDefinitionLevel) {
switch (primitiveType.getPrimitiveTypeName()) {
case BINARY:
case FIXED_LEN_BYTE_ARRAY:
case INT96:
return primitiveType.stringifier().stringify(columnReader.getBinary());
case BOOLEAN:
return columnReader.getBoolean();
case DOUBLE:
return columnReader.getDouble();
case FLOAT:
return columnReader.getFloat();
case INT32:
return columnReader.getInteger();
case INT64:
return columnReader.getLong();
default:
throw new IllegalArgumentException("Unsupported type: " + primitiveType);
}
} else {
return null;
}
}
@Override
public void close() throws IOException {
reader.close();
}
@Override
public boolean tryAdvance(Consumer<? super S> action) {
try {
if (this.finished) {
return false;
}
if (currentRowIndex == currentRowGroupSize) {
PageReadStore rowGroup = reader.readNextRowGroup();
if (rowGroup == null) {
this.finished = true;
return false;
}
ColumnReadStore columnReadStore = new ColumnReadStoreImpl(rowGroup, this.recordConverter, this.schema, this.createdBy);
this.currentRowGroupSize = rowGroup.getRowCount();
this.currentRowGroupColumnReaders = columns.stream().map(columnReadStore::getColumnReader).collect(Collectors.toList());
this.currentRowIndex = 0L;
}
U record = hydrator.start();
for (ColumnReader columnReader: this.currentRowGroupColumnReaders) {
record = hydrator.add(record, columnReader.getDescriptor().getPath()[0], readValue(columnReader));
columnReader.consume();
if (columnReader.getCurrentRepetitionLevel() != 0) {
throw new IllegalStateException("Unexpected repetition");
}
}
action.accept(hydrator.finish(record));
this.currentRowIndex++;
return true;
} catch (Exception e) {
throw new RuntimeException("Failed to read parquet", e);
}
}
@Override
public Spliterator<S> trySplit() {
return null;
}
@Override
public long estimateSize() {
return reader.getRecordCount();
}
@Override
public int characteristics() {
return ORDERED | NONNULL | DISTINCT;
}
public ParquetMetadata metaData() {
return this.reader.getFooter();
}
public static InputFile makeInputFile(File file) {
return new InputFile() {
@Override
public long getLength() {
return file.length();
}
@Override
public SeekableInputStream newStream() throws IOException {
FileInputStream fis = new FileInputStream(file);
return new DelegatingSeekableInputStream(fis) {
private long position;
@Override
public long getPos() {
return position;
}
@Override
public void seek(long newPos) throws IOException {
fis.getChannel().position(newPos);
position = newPos;
}
};
}
};
}
}

View File

@ -0,0 +1,166 @@
package blue.strategic.parquet;
import org.apache.hadoop.conf.Configuration;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.api.WriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.DelegatingPositionOutputStream;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import java.io.Closeable;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Collections;
public final class ParquetWriter<T> implements Closeable {
private final org.apache.parquet.hadoop.ParquetWriter<T> writer;
public static <T> ParquetWriter<T> writeFile(MessageType schema, File out, Dehydrator<T> dehydrator) throws IOException {
OutputFile f = new OutputFile() {
@Override
public PositionOutputStream create(long blockSizeHint) throws IOException {
return createOrOverwrite(blockSizeHint);
}
@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
FileOutputStream fos = new FileOutputStream(out);
return new DelegatingPositionOutputStream(fos) {
@Override
public long getPos() throws IOException {
return fos.getChannel().position();
}
};
}
@Override
public boolean supportsBlockSize() {
return false;
}
@Override
public long defaultBlockSize() {
return 1024L;
}
};
return writeOutputFile(schema, f, dehydrator);
}
private static <T> ParquetWriter<T> writeOutputFile(MessageType schema, OutputFile file, Dehydrator<T> dehydrator) throws IOException {
return new ParquetWriter<>(file, schema, dehydrator);
}
private ParquetWriter(OutputFile outputFile, MessageType schema, Dehydrator<T> dehydrator) throws IOException {
this.writer = new Builder<T>(outputFile)
.withType(schema)
.withDehydrator(dehydrator)
.withCompressionCodec(CompressionCodecName.SNAPPY)
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_2_0)
.build();
}
public void write(T record) throws IOException {
writer.write(record);
}
@Override
public void close() throws IOException {
this.writer.close();
}
private static final class Builder<T> extends org.apache.parquet.hadoop.ParquetWriter.Builder<T, ParquetWriter.Builder<T>> {
private MessageType schema;
private Dehydrator<T> dehydrator;
private Builder(OutputFile file) {
super(file);
}
public ParquetWriter.Builder<T> withType(MessageType schema) {
this.schema = schema;
return this;
}
public ParquetWriter.Builder<T> withDehydrator(Dehydrator<T> dehydrator) {
this.dehydrator = dehydrator;
return this;
}
@Override
protected ParquetWriter.Builder<T> self() {
return this;
}
@Override
protected WriteSupport<T> getWriteSupport(Configuration conf) {
return new SimpleWriteSupport<>(schema, dehydrator);
}
}
private static class SimpleWriteSupport<T> extends WriteSupport<T> {
private final MessageType schema;
private final Dehydrator<T> dehydrator;
private final ValueWriter valueWriter = SimpleWriteSupport.this::writeField;
private RecordConsumer recordConsumer;
SimpleWriteSupport(MessageType schema, Dehydrator<T> dehydrator) {
this.schema = schema;
this.dehydrator = dehydrator;
}
@Override
public WriteContext init(Configuration configuration) {
return new WriteContext(schema, Collections.emptyMap());
}
@Override
public void prepareForWrite(RecordConsumer recordConsumer) {
this.recordConsumer = recordConsumer;
}
@Override
public void write(T record) {
recordConsumer.startMessage();
dehydrator.dehydrate(record, valueWriter);
recordConsumer.endMessage();
}
@Override
public String getName() {
return "blue.strategic.parquet.ParquetWriter";
}
private void writeField(String name, Object value) {
int fieldIndex = schema.getFieldIndex(name);
PrimitiveType type = schema.getType(fieldIndex).asPrimitiveType();
recordConsumer.startField(name, fieldIndex);
switch (type.getPrimitiveTypeName()) {
case INT32: recordConsumer.addInteger((int)value); break;
case INT64: recordConsumer.addLong((long)value); break;
case DOUBLE: recordConsumer.addDouble((double)value); break;
case BOOLEAN: recordConsumer.addBoolean((boolean)value); break;
case FLOAT: recordConsumer.addFloat((float)value); break;
case BINARY:
if (type.getLogicalTypeAnnotation() == LogicalTypeAnnotation.stringType()) {
recordConsumer.addBinary(Binary.fromString((String)value));
} else {
throw new UnsupportedOperationException("We don't support writing " + type.getLogicalTypeAnnotation());
}
break;
default:
throw new UnsupportedOperationException("We don't support writing " + type.getPrimitiveTypeName());
}
recordConsumer.endField(name, fieldIndex);
}
}
}

View File

@ -0,0 +1,5 @@
package blue.strategic.parquet;
public interface ValueWriter {
void write(String name, Object value);
}

View File

@ -0,0 +1,5 @@
package org.apache.hadoop.conf;
public interface Configurable {
void setConf(Configuration conf);
}

View File

@ -0,0 +1,19 @@
package org.apache.hadoop.conf;
public class Configuration {
public boolean getBoolean(String x, boolean y) {
return y;
}
public void setBoolean(String x, boolean y) {
}
public int getInt(String x, int y) {
return y;
}
public String get(String x) {
return null;
}
}

View File

@ -0,0 +1,51 @@
package org.apache.hadoop.fs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.RandomAccessFile;
public class FSDataInputStream extends InputStream {
private final RandomAccessFile input;
public FSDataInputStream(org.apache.hadoop.fs.Path p) throws FileNotFoundException {
this.input = new RandomAccessFile(p.file(), "r");
}
@Override
public int read() throws IOException {
return input.read();
}
@Override
public int read(byte[] buf, int off, int len) throws IOException {
try {
input.readFully(buf, off, len);
return len;
} catch (IOException e) {
e.printStackTrace();
return -1;
}
}
public void seek(long pos) {
try {
input.seek(pos);
} catch (IOException e) {
e.printStackTrace();
}
}
public void readFully(byte[] buf, int a, int b) {
try {
input.readFully(buf, a, b);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void close() throws IOException {
input.close();
}
}

View File

@ -0,0 +1,28 @@
package org.apache.hadoop.fs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.RandomAccessFile;
public class FSDataOutputStream extends OutputStream {
private final RandomAccessFile output;
public FSDataOutputStream(org.apache.hadoop.fs.Path p) throws FileNotFoundException {
this.output = new RandomAccessFile(p.file(), "rw");
}
@Override
public void write(int b) throws IOException {
this.output.write(b);
}
@Override
public void close() throws IOException {
output.close();
}
public long getPos() throws IOException {
return this.output.getFilePointer();
}
}

View File

@ -0,0 +1,21 @@
package org.apache.hadoop.fs;
public class FileStatus {
private final org.apache.hadoop.fs.Path path;
public FileStatus(org.apache.hadoop.fs.Path p) {
path = p;
}
public boolean isFile() {
return true;
}
public org.apache.hadoop.fs.Path getPath() {
return path;
}
public long getLen() {
return path.file().length();
}
}

View File

@ -0,0 +1,51 @@
package org.apache.hadoop.fs;
import java.io.FileNotFoundException;
import java.net.URI;
import java.net.URISyntaxException;
public class FileSystem {
public FileStatus getFileStatus(org.apache.hadoop.fs.Path p) {
return new FileStatus(p);
}
public org.apache.hadoop.fs.Path makeQualified(org.apache.hadoop.fs.Path p) {
return p;
}
public URI getUri() {
try {
return new URI("http://localhost/");
} catch (URISyntaxException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public short getDefaultReplication(org.apache.hadoop.fs.Path p) {
return 0;
}
public long getDefaultBlockSize(org.apache.hadoop.fs.Path p) {
return 1024;
}
public FSDataInputStream open(org.apache.hadoop.fs.Path p) {
try {
return new FSDataInputStream(p);
} catch (FileNotFoundException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
public org.apache.hadoop.fs.FSDataOutputStream create(org.apache.hadoop.fs.Path p, boolean a, int b, short c, long d) {
try {
return new FSDataOutputStream(p);
} catch (FileNotFoundException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}
}

View File

@ -0,0 +1,22 @@
package org.apache.hadoop.fs;
import org.apache.hadoop.conf.Configuration;
import java.io.File;
public class Path {
private final File file;
public Path(String path) {
file = new File(path);
}
public FileSystem getFileSystem(Configuration conf) {
return new FileSystem();
}
public File file() {
return file;
}
}

View File

@ -0,0 +1,4 @@
package org.apache.hadoop.fs;
public interface PathFilter {
}

View File

@ -0,0 +1,84 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.EOFException;
import java.io.IOException;
/**
* Stream that permits positional reading.
*
* Implementations are required to implement thread-safe operations; this may
* be supported by concurrent access to the data, or by using a synchronization
* mechanism to serialize access.
*
* Not all implementations meet this requirement. Those that do not cannot
* be used as a backing store for some applications, such as Apache HBase.
*
* Independent of whether or not they are thread safe, some implementations
* may make the intermediate state of the system, specifically the position
* obtained in {@code Seekable.getPos()} visible.
*/
public interface PositionedReadable {
/**
* Read up to the specified number of bytes, from a given
* position within a file, and return the number of bytes read. This does not
* change the current offset of a file, and is thread-safe.
*
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
* @param position position within file
* @param buffer destination buffer
* @param offset offset in the buffer
* @param length number of bytes to read
* @return actual number of bytes read; -1 means "none"
* @throws IOException IO problems.
*/
int read(long position, byte[] buffer, int offset, int length)
throws IOException;
/**
* Read the specified number of bytes, from a given
* position within a file. This does not
* change the current offset of a file, and is thread-safe.
*
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
* @param position position within file
* @param buffer destination buffer
* @param offset offset in the buffer
* @param length number of bytes to read
* @throws IOException IO problems.
* @throws EOFException the end of the data was reached before
* the read operation completed
*/
void readFully(long position, byte[] buffer, int offset, int length)
throws IOException;
/**
* Read number of bytes equal to the length of the buffer, from a given
* position within a file. This does not
* change the current offset of a file, and is thread-safe.
*
* <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
* @param position position within file
* @param buffer destination buffer
* @throws IOException IO problems.
* @throws EOFException the end of the data was reached before
* the read operation completed
*/
void readFully(long position, byte[] buffer) throws IOException;
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
/**
* Stream that permits seeking.
*/
public interface Seekable {
/**
* Seek to the given offset from the start of the file.
* The next read() will be from that location. Can't
* seek past the end of the file.
*/
void seek(long pos) throws IOException;
/**
* Return the current offset from the start of the file
*/
long getPos() throws IOException;
/**
* Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*/
boolean seekToNewSource(long targetPos) throws IOException;
}

View File

@ -0,0 +1,21 @@
package org.apache.hadoop.io.compress;
public final class CodecPool {
private CodecPool() { /* prevent instantiation */ }
public static Decompressor getDecompressor(CompressionCodec codec) {
return codec.createDecompressor();
}
public static void returnDecompressor(Decompressor decompressor) {
}
public static Compressor getCompressor(CompressionCodec codec) {
return codec.createCompressor();
}
public static void returnCompressor(Compressor compressor) {
}
}

View File

@ -0,0 +1,11 @@
package org.apache.hadoop.io.compress;
import java.io.InputStream;
import java.io.OutputStream;
public interface CompressionCodec {
Decompressor createDecompressor();
Compressor createCompressor();
CompressionInputStream createInputStream(InputStream is, Decompressor d);
CompressionOutputStream createOutputStream(OutputStream os, Compressor c);
}

View File

@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
/**
* A compression input stream.
*
* <p>Implementations are assumed to be buffered. This permits clients to
* reposition the underlying input stream then call {@link #resetState()},
* without having to also synchronize client buffers.
*/
public abstract class CompressionInputStream extends InputStream implements Seekable {
/**
* The input stream to be compressed.
*/
protected final InputStream in;
protected long maxAvailableData;
private Decompressor trackedDecompressor;
/**
* Create a compression input stream that reads
* the decompressed bytes from the given stream.
*
* @param in The input stream to be compressed.
* @throws IOException
*/
protected CompressionInputStream(InputStream in) throws IOException {
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
this.maxAvailableData = in.available();
}
this.in = in;
}
@Override
public void close() throws IOException {
try {
in.close();
} finally {
if (trackedDecompressor != null) {
CodecPool.returnDecompressor(trackedDecompressor);
trackedDecompressor = null;
}
}
}
/**
* Read bytes from the stream.
* Made abstract to prevent leakage to underlying stream.
*/
@Override
public abstract int read(byte[] b, int off, int len) throws IOException;
/**
* Reset the decompressor to its initial state and discard any buffered data,
* as the underlying stream may have been repositioned.
*/
public abstract void resetState() throws IOException;
/**
* This method returns the current position in the stream.
*
* @return Current position in stream as a long
*/
@Override
public long getPos() throws IOException {
if (!(in instanceof Seekable) || !(in instanceof PositionedReadable)) {
//This way of getting the current position will not work for file
//size which can be fit in an int and hence can not be returned by
//available method.
return this.maxAvailableData - this.in.available();
} else {
return ((Seekable)this.in).getPos();
}
}
/**
* This method is current not supported.
*
* @throws UnsupportedOperationException
*/
@Override
public void seek(long pos) throws UnsupportedOperationException {
throw new UnsupportedOperationException();
}
/**
* This method is current not supported.
*
* @throws UnsupportedOperationException
*/
@Override
public boolean seekToNewSource(long targetPos) throws UnsupportedOperationException {
throw new UnsupportedOperationException();
}
void setTrackedDecompressor(Decompressor decompressor) {
trackedDecompressor = decompressor;
}
}

View File

@ -0,0 +1,92 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress;
import java.io.IOException;
import java.io.OutputStream;
/**
* A compression output stream.
*/
public abstract class CompressionOutputStream extends OutputStream {
/**
* The output stream to be compressed.
*/
protected final OutputStream out;
/**
* If non-null, this is the Compressor object that we should call
* CodecPool#returnCompressor on when this stream is closed.
*/
private Compressor trackedCompressor;
/**
* Create a compression output stream that writes
* the compressed bytes to the given stream.
* @param out
*/
protected CompressionOutputStream(OutputStream out) {
this.out = out;
}
void setTrackedCompressor(Compressor compressor) {
trackedCompressor = compressor;
}
@Override
public void close() throws IOException {
try {
finish();
} finally {
try {
out.close();
} finally {
if (trackedCompressor != null) {
CodecPool.returnCompressor(trackedCompressor);
trackedCompressor = null;
}
}
}
}
@Override
public void flush() throws IOException {
out.flush();
}
/**
* Write compressed bytes to the stream.
* Made abstract to prevent leakage to underlying stream.
*/
@Override
public abstract void write(byte[] b, int off, int len) throws IOException;
/**
* Finishes writing compressed data to the output stream
* without closing the underlying stream.
*/
public abstract void finish() throws IOException;
/**
* Reset the compression to the initial state.
* Does not reset the underlying stream.
*/
public abstract void resetState() throws IOException;
}

View File

@ -0,0 +1,116 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
/**
* Specification of a stream-based 'compressor' which can be
* plugged into a {@link CompressionOutputStream} to compress data.
* This is modelled after {@link java.util.zip.Deflater}
*
*/
public interface Compressor {
/**
* Sets input data for compression.
* This should be called whenever #needsInput() returns
* <code>true</code> indicating that more input data is required.
*
* @param b Input data
* @param off Start offset
* @param len Length
*/
void setInput(byte[] b, int off, int len);
/**
* Returns true if the input data buffer is empty and
* #setInput() should be called to provide more input.
*
* @return <code>true</code> if the input data buffer is empty and
* #setInput() should be called in order to provide more input.
*/
boolean needsInput();
/**
* Sets preset dictionary for compression. A preset dictionary
* is used when the history buffer can be predetermined.
*
* @param b Dictionary data bytes
* @param off Start offset
* @param len Length
*/
void setDictionary(byte[] b, int off, int len);
/**
* Return number of uncompressed bytes input so far.
*/
long getBytesRead();
/**
* Return number of compressed bytes output so far.
*/
long getBytesWritten();
/**
* When called, indicates that compression should end
* with the current contents of the input buffer.
*/
void finish();
/**
* Returns true if the end of the compressed
* data output stream has been reached.
* @return <code>true</code> if the end of the compressed
* data output stream has been reached.
*/
boolean finished();
/**
* Fills specified buffer with compressed data. Returns actual number
* of bytes of compressed data. A return value of 0 indicates that
* needsInput() should be called in order to determine if more input
* data is required.
*
* @param b Buffer for the compressed data
* @param off Start offset of the data
* @param len Size of the buffer
* @return The actual number of bytes of compressed data.
*/
int compress(byte[] b, int off, int len) throws IOException;
/**
* Resets compressor so that a new set of input data can be processed.
*/
void reset();
/**
* Closes the compressor and discards any unprocessed input.
*/
void end();
/**
* Prepare the compressor to be used in a new stream with settings defined in
* the given Configuration
*
* @param conf Configuration from which new setting are fetched
*/
void reinit(Configuration conf);
}

View File

@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress;
import java.io.IOException;
import java.io.OutputStream;
public class CompressorStream extends CompressionOutputStream {
protected Compressor compressor;
protected byte[] buffer;
protected boolean closed;
public CompressorStream(OutputStream out, Compressor compressor, int bufferSize) {
super(out);
if (out == null || compressor == null) {
throw new NullPointerException();
} else if (bufferSize <= 0) {
throw new IllegalArgumentException("Illegal bufferSize");
}
this.compressor = compressor;
buffer = new byte[bufferSize];
}
public CompressorStream(OutputStream out, Compressor compressor) {
this(out, compressor, 512);
}
/**
* Allow derived classes to directly set the underlying stream.
*
* @param out Underlying output stream.
*/
protected CompressorStream(OutputStream out) {
super(out);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
// Sanity checks
if (compressor.finished()) {
throw new IOException("write beyond end of stream");
}
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return;
}
compressor.setInput(b, off, len);
while (!compressor.needsInput()) {
compress();
}
}
protected void compress() throws IOException {
int len = compressor.compress(buffer, 0, buffer.length);
if (len > 0) {
out.write(buffer, 0, len);
}
}
@Override
public void finish() throws IOException {
if (!compressor.finished()) {
compressor.finish();
while (!compressor.finished()) {
compress();
}
}
}
@Override
public void resetState() throws IOException {
compressor.reset();
}
@Override
public void close() throws IOException {
if (!closed) {
try {
super.close();
} finally {
closed = true;
}
}
}
private byte[] oneByte = new byte[1];
@Override
public void write(int b) throws IOException {
oneByte[0] = (byte)(b & 0xff);
write(oneByte, 0, oneByte.length);
}
}

View File

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress;
import java.io.IOException;
/**
* Specification of a stream-based 'de-compressor' which can be
* plugged into a {@link CompressionInputStream} to compress data.
* This is modelled after {@link java.util.zip.Inflater}
*
*/
public interface Decompressor {
/**
* Sets input data for decompression.
* This should be called if and only if {@link #needsInput()} returns
* <code>true</code> indicating that more input data is required.
* (Both native and non-native versions of various Decompressors require
* that the data passed in via <code>b[]</code> remain unmodified until
* the caller is explicitly notified--via {@link #needsInput()}--that the
* buffer may be safely modified. With this requirement, an extra
* buffer-copy can be avoided.)
*
* @param b Input data
* @param off Start offset
* @param len Length
*/
void setInput(byte[] b, int off, int len);
/**
* Returns <code>true</code> if the input data buffer is empty and
* {@link #setInput(byte[], int, int)} should be called to
* provide more input.
*
* @return <code>true</code> if the input data buffer is empty and
* {@link #setInput(byte[], int, int)} should be called in
* order to provide more input.
*/
boolean needsInput();
/**
* Sets preset dictionary for compression. A preset dictionary
* is used when the history buffer can be predetermined.
*
* @param b Dictionary data bytes
* @param off Start offset
* @param len Length
*/
void setDictionary(byte[] b, int off, int len);
/**
* Returns <code>true</code> if a preset dictionary is needed for decompression.
* @return <code>true</code> if a preset dictionary is needed for decompression
*/
boolean needsDictionary();
/**
* Returns <code>true</code> if the end of the decompressed
* data output stream has been reached. Indicates a concatenated data stream
* when finished() returns <code>true</code> and {@link #getRemaining()}
* returns a positive value. finished() will be reset with the
* {@link #reset()} method.
* @return <code>true</code> if the end of the decompressed
* data output stream has been reached.
*/
boolean finished();
/**
* Fills specified buffer with uncompressed data. Returns actual number
* of bytes of uncompressed data. A return value of 0 indicates that
* {@link #needsInput()} should be called in order to determine if more
* input data is required.
*
* @param b Buffer for the compressed data
* @param off Start offset of the data
* @param len Size of the buffer
* @return The actual number of bytes of uncompressed data.
* @throws IOException
*/
int decompress(byte[] b, int off, int len) throws IOException;
/**
* Returns the number of bytes remaining in the compressed data buffer.
* Indicates a concatenated data stream if {@link #finished()} returns
* <code>true</code> and getRemaining() returns a positive value. If
* {@link #finished()} returns <code>true</code> and getRemaining() returns
* a zero value, indicates that the end of data stream has been reached and
* is not a concatenated data stream.
* @return The number of bytes remaining in the compressed data buffer.
*/
int getRemaining();
/**
* Resets decompressor and input and output buffers so that a new set of
* input data can be processed. If {@link #finished()}} returns
* <code>true</code> and {@link #getRemaining()} returns a positive value,
* reset() is called before processing of the next data stream in the
* concatenated data stream. {@link #finished()} will be reset and will
* return <code>false</code> when reset() is called.
*/
void reset();
/**
* Closes the decompressor and discards any unprocessed input.
*/
void end();
}

View File

@ -0,0 +1,239 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io.compress;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
public class DecompressorStream extends CompressionInputStream {
/**
* The maximum input buffer size.
*/
private static final int MAX_INPUT_BUFFER_SIZE = 512;
/**
* MAX_SKIP_BUFFER_SIZE is used to determine the maximum buffer size to
* use when skipping. See {@link java.io.InputStream}.
*/
private static final int MAX_SKIP_BUFFER_SIZE = 2048;
private byte[] skipBytes;
private byte[] oneByte = new byte[1];
protected Decompressor decompressor;
protected byte[] buffer;
protected boolean eof;
protected boolean closed;
private int lastBytesSent;
DecompressorStream(InputStream in, Decompressor decompressor,
int bufferSize, int skipBufferSize)
throws IOException {
super(in);
if (decompressor == null) {
throw new NullPointerException();
} else if (bufferSize <= 0) {
throw new IllegalArgumentException("Illegal bufferSize");
}
this.decompressor = decompressor;
buffer = new byte[bufferSize];
skipBytes = new byte[skipBufferSize];
}
public DecompressorStream(InputStream in, Decompressor decompressor,
int bufferSize)
throws IOException {
this(in, decompressor, bufferSize, MAX_SKIP_BUFFER_SIZE);
}
public DecompressorStream(InputStream in, Decompressor decompressor)
throws IOException {
this(in, decompressor, MAX_INPUT_BUFFER_SIZE);
}
/**
* Allow derived classes to directly set the underlying stream.
*
* @param in Underlying input stream.
* @throws IOException
*/
protected DecompressorStream(InputStream in) throws IOException {
super(in);
}
@Override
public int read() throws IOException {
checkStream();
return (read(oneByte, 0, oneByte.length) == -1) ? -1 : (oneByte[0] & 0xff);
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
checkStream();
if ((off | len | (off + len) | (b.length - (off + len))) < 0) {
throw new IndexOutOfBoundsException();
} else if (len == 0) {
return 0;
}
return decompress(b, off, len);
}
protected int decompress(byte[] b, int off, int len) throws IOException {
int n;
while ((n = decompressor.decompress(b, off, len)) == 0) {
if (decompressor.needsDictionary()) {
eof = true;
return -1;
}
if (decompressor.finished()) {
// First see if there was any leftover buffered input from previous
// stream; if not, attempt to refill buffer. If refill -> EOF, we're
// all done; else reset, fix up input buffer, and get ready for next
// concatenated substream/"member".
int nRemaining = decompressor.getRemaining();
if (nRemaining == 0) {
int m = getCompressedData();
if (m == -1) {
// apparently the previous end-of-stream was also end-of-file:
// return success, as if we had never called getCompressedData()
eof = true;
return -1;
}
decompressor.reset();
decompressor.setInput(buffer, 0, m);
lastBytesSent = m;
} else {
// looks like it's a concatenated stream: reset low-level zlib (or
// other engine) and buffers, then "resend" remaining input data
decompressor.reset();
int leftoverOffset = lastBytesSent - nRemaining;
assert leftoverOffset >= 0;
// this recopies userBuf -> direct buffer if using native libraries:
decompressor.setInput(buffer, leftoverOffset, nRemaining);
// NOTE: this is the one place we do NOT want to save the number
// of bytes sent (nRemaining here) into lastBytesSent: since we
// are resending what we've already sent before, offset is nonzero
// in general (only way it could be zero is if it already equals
// nRemaining), which would then screw up the offset calculation
// _next_ time around. IOW, getRemaining() is in terms of the
// original, zero-offset bufferload, so lastBytesSent must be as
// well. Cheesy ASCII art:
//
// <------------ m, lastBytesSent ----------->
// +===============================================+
// buffer: |1111111111|22222222222222222|333333333333| |
// +===============================================+
// #1: <-- off -->|<-------- nRemaining --------->
// #2: <----------- off ----------->|<-- nRem. -->
// #3: (final substream: nRemaining == 0; eof = true)
//
// If lastBytesSent is anything other than m, as shown, then "off"
// will be calculated incorrectly.
}
} else if (decompressor.needsInput()) {
int m = getCompressedData();
if (m == -1) {
throw new EOFException("Unexpected end of input stream");
}
decompressor.setInput(buffer, 0, m);
lastBytesSent = m;
}
}
return n;
}
protected int getCompressedData() throws IOException {
checkStream();
// note that the _caller_ is now required to call setInput() or throw
return in.read(buffer, 0, buffer.length);
}
protected void checkStream() throws IOException {
if (closed) {
throw new IOException("Stream closed");
}
}
@Override
public void resetState() throws IOException {
decompressor.reset();
}
@Override
public long skip(long n) throws IOException {
// Sanity checks
if (n < 0) {
throw new IllegalArgumentException("negative skip length");
}
checkStream();
// Read 'n' bytes
int skipped = 0;
while (skipped < n) {
int len = Math.min((int)n - skipped, skipBytes.length);
len = read(skipBytes, 0, len);
if (len == -1) {
eof = true;
break;
}
skipped += len;
}
return skipped;
}
@Override
public int available() throws IOException {
checkStream();
return eof ? 0 : 1;
}
@Override
public void close() throws IOException {
if (!closed) {
try {
super.close();
} finally {
closed = true;
}
}
}
@Override
public boolean markSupported() {
return false;
}
@Override
public synchronized void mark(int readlimit) {
}
@Override
public synchronized void reset() throws IOException {
throw new IOException("mark/reset not supported");
}
}

View File

@ -0,0 +1,4 @@
package org.apache.hadoop.mapreduce;
public class Job extends JobContext {
}

View File

@ -0,0 +1,4 @@
package org.apache.hadoop.mapreduce;
public class JobContext {
}

View File

@ -0,0 +1,4 @@
package org.apache.hadoop.mapreduce;
public interface OutputCommitter {
}

View File

@ -0,0 +1,4 @@
package org.apache.hadoop.mapreduce;
public class RecordReader {
}

View File

@ -0,0 +1,4 @@
package org.apache.hadoop.mapreduce;
public class RecordWriter {
}

View File

@ -0,0 +1,4 @@
package org.apache.hadoop.mapreduce;
public class TaskAttemptContext extends JobContext {
}

View File

@ -0,0 +1,4 @@
package org.apache.hadoop.mapreduce.lib.input;
public class FileInputFormat {
}

View File

@ -0,0 +1,6 @@
package org.apache.hadoop.mapreduce.lib.output;
import org.apache.hadoop.mapreduce.OutputCommitter;
public class FileOutputCommitter implements OutputCommitter {
}

View File

@ -0,0 +1,4 @@
package org.apache.hadoop.mapreduce.lib.output;
public class FileOutputFormat {
}

View File

@ -0,0 +1,22 @@
package org.apache.hadoop.util;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
public final class ReflectionUtils {
private ReflectionUtils() { /* prevent instantitation */ }
public static Object newInstance(Class<?> type, Configuration x) {
try {
Object o = type.newInstance();
if (o instanceof Configurable) {
((Configurable) o).setConf(x);
}
return o;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
}