HdfsParquetDataSource.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive.parquet;

import com.facebook.presto.hive.FileFormatDataSourceStats;
import com.facebook.presto.parquet.AbstractParquetDataSource;
import com.facebook.presto.parquet.ParquetDataSourceId;
import com.facebook.presto.spi.PrestoException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.format.Util;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.hadoop.metadata.IndexReference;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Optional;

import static com.facebook.presto.hive.HiveErrorCode.HIVE_CANNOT_OPEN_SPLIT;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static com.google.common.base.Strings.nullToEmpty;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class HdfsParquetDataSource
        extends AbstractParquetDataSource
{
    private final FSDataInputStream inputStream;
    private final FileFormatDataSourceStats stats;

    public HdfsParquetDataSource(ParquetDataSourceId id, FSDataInputStream inputStream, FileFormatDataSourceStats stats)
    {
        super(id);
        this.stats = requireNonNull(stats, "stats is null");
        this.inputStream = requireNonNull(inputStream, "inputStream is null");
    }

    @Override
    public void close()
            throws IOException
    {
        inputStream.close();
    }

    @Override
    protected void readInternal(long position, byte[] buffer, int bufferOffset, int bufferLength)
    {
        try {
            long start = System.nanoTime();
            inputStream.readFully(position, buffer, bufferOffset, bufferLength);
            stats.readDataBytesPerSecond(bufferLength, System.nanoTime() - start);
        }
        catch (PrestoException e) {
            // just in case there is a Presto wrapper or hook
            throw e;
        }
        catch (Exception e) {
            throw new PrestoException(HIVE_FILESYSTEM_ERROR, format("Error reading from %s at position %s", getId(), position), e);
        }
    }

    @Override
    public Optional<ColumnIndex> readColumnIndex(ColumnChunkMetaData column)
            throws IOException
    {
        IndexReference indexRef = column.getColumnIndexReference();
        if (indexRef == null) {
            return Optional.empty();
        }
        inputStream.seek(indexRef.getOffset());
        return Optional.of(ParquetMetadataConverter.fromParquetColumnIndex(column.getPrimitiveType(), Util.readColumnIndex(inputStream)));
    }

    @Override
    public Optional<OffsetIndex> readOffsetIndex(ColumnChunkMetaData column)
            throws IOException
    {
        IndexReference indexRef = column.getOffsetIndexReference();
        if (indexRef == null) {
            return Optional.empty();
        }
        inputStream.seek(indexRef.getOffset());
        return Optional.of(ParquetMetadataConverter.fromParquetOffsetIndex(Util.readOffsetIndex(inputStream)));
    }

    public static HdfsParquetDataSource buildHdfsParquetDataSource(FileSystem fileSystem, Path path, long start, long length, FileFormatDataSourceStats stats)
    {
        try {
            FSDataInputStream inputStream = fileSystem.open(path);
            return new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), inputStream, stats);
        }
        catch (Exception e) {
            if (nullToEmpty(e.getMessage()).trim().equals("Filesystem closed") ||
                    e instanceof FileNotFoundException) {
                throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, e);
            }
            throw new PrestoException(HIVE_CANNOT_OPEN_SPLIT, format("Error opening Hive split %s (offset=%s, length=%s): %s", path, start, length, e.getMessage()), e);
        }
    }

    public static HdfsParquetDataSource buildHdfsParquetDataSource(FSDataInputStream inputStream, Path path, FileFormatDataSourceStats stats)
    {
        return new HdfsParquetDataSource(new ParquetDataSourceId(path.toString()), inputStream, stats);
    }
}