BooleanRLEValuesDecoder.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.parquet.batchreader.decoders.rle;
import com.facebook.presto.parquet.batchreader.BytesUtils;
import com.facebook.presto.parquet.batchreader.decoders.ValuesDecoder.BooleanValuesDecoder;
import org.apache.parquet.Preconditions;
import org.apache.parquet.io.ParquetDecodingException;
import org.openjdk.jol.info.ClassLayout;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import static com.google.common.base.Preconditions.checkState;
import static java.util.Objects.requireNonNull;
public class BooleanRLEValuesDecoder
implements BooleanValuesDecoder
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(BooleanRLEValuesDecoder.class).instanceSize();
private final ByteBuffer inputBuffer;
private MODE mode;
private int currentCount;
private byte currentValue;
private int currentByteOffset;
private byte currentByte;
public BooleanRLEValuesDecoder(ByteBuffer inputBuffer)
{
this.inputBuffer = requireNonNull(inputBuffer);
}
// Copied from BytesUtils.readUnsignedVarInt(InputStream in)
public static int readUnsignedVarInt(ByteBuffer in)
{
int value = 0;
int i;
int b = in.get();
for (i = 0; (b & 128) != 0; i += 7) {
value |= (b & 127) << i;
b = in.get();
}
return value | b << i;
}
@Override
public void readNext(byte[] values, int offset, int length)
{
int destinationIndex = offset;
int remainingToCopy = length;
while (remainingToCopy > 0) {
if (currentCount == 0) {
readNext();
if (currentCount == 0) {
break;
}
}
int numEntriesToFill = Math.min(remainingToCopy, currentCount);
int endIndex = destinationIndex + numEntriesToFill;
switch (mode) {
case RLE: {
byte rleValue = currentValue;
while (destinationIndex < endIndex) {
values[destinationIndex] = rleValue;
destinationIndex++;
}
break;
}
case PACKED: {
int remainingPackedBlock = numEntriesToFill;
if (currentByteOffset > 0) {
// read from the partial values remaining in current byte
int readChunk = Math.min(remainingPackedBlock, 8 - currentByteOffset);
final byte inValue = currentByte;
for (int i = 0; i < readChunk; i++) {
values[destinationIndex++] = (byte) (inValue >> currentByteOffset & 1);
currentByteOffset++;
}
remainingPackedBlock -= readChunk;
currentByteOffset = currentByteOffset % 8;
}
final ByteBuffer localInputBuffer = inputBuffer;
while (remainingPackedBlock >= 8) {
BytesUtils.unpack8Values(localInputBuffer.get(), values, destinationIndex);
remainingPackedBlock -= 8;
destinationIndex += 8;
}
if (remainingPackedBlock > 0) {
// read partial values from current byte until the requested length is satisfied
byte inValue = localInputBuffer.get();
for (int i = 0; i < remainingPackedBlock; i++) {
values[destinationIndex++] = (byte) (inValue >> i & 1);
}
currentByte = inValue;
currentByteOffset = remainingPackedBlock;
}
break;
}
default:
throw new ParquetDecodingException("not a valid mode " + mode);
}
currentCount -= numEntriesToFill;
remainingToCopy -= numEntriesToFill;
}
}
@Override
public void skip(int length)
{
int remainingToSkip = length;
while (remainingToSkip > 0) {
if (currentCount == 0) {
readNext();
if (currentCount == 0) {
break;
}
}
int numEntriesToSkip = Math.min(remainingToSkip, currentCount);
switch (mode) {
case RLE:
break;
case PACKED: {
int remainingPackedBlock = numEntriesToSkip;
if (currentByteOffset > 0) {
// read from the partial values remaining in current byte
int skipChunk = Math.min(remainingPackedBlock, 8 - currentByteOffset);
currentByteOffset += skipChunk;
remainingPackedBlock -= skipChunk;
currentByteOffset = currentByteOffset % 8;
}
int fullBytes = remainingPackedBlock / 8;
if (fullBytes > 0) {
((Buffer) inputBuffer).position(inputBuffer.position() + fullBytes);
}
remainingPackedBlock = remainingPackedBlock % 8;
if (remainingPackedBlock > 0) {
// read partial values from current byte until the requested length is satisfied
currentByte = inputBuffer.get();
currentByteOffset = remainingPackedBlock;
}
break;
}
default:
throw new ParquetDecodingException("not a valid mode " + mode);
}
currentCount -= numEntriesToSkip;
remainingToSkip -= numEntriesToSkip;
}
checkState(remainingToSkip == 0, "Invalid read size request");
}
@Override
public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE + inputBuffer.array().length;
}
private void readNext()
{
Preconditions.checkArgument(inputBuffer.hasRemaining(), "Reading past RLE/BitPacking stream.");
int header = readUnsignedVarInt(inputBuffer);
mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
switch (mode) {
case RLE:
currentCount = header >>> 1;
currentValue = inputBuffer.get();
return;
case PACKED:
int numGroups = header >>> 1;
currentCount = numGroups * 8;
currentByteOffset = 0;
return;
default:
throw new ParquetDecodingException("not a valid mode " + mode);
}
}
private enum MODE
{
RLE,
PACKED;
}
}