ParquetColumnIndexStore.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.parquet.reader;
import com.facebook.presto.parquet.ParquetDataSource;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.filter2.columnindex.ColumnIndexStore;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import static java.util.Collections.emptySet;
/**
* Internal implementation of {@link ColumnIndexStore}.
*/
public class ParquetColumnIndexStore
implements ColumnIndexStore
{
private interface IndexStore
{
Optional<ColumnIndex> getColumnIndex();
Optional<OffsetIndex> getOffsetIndex();
}
private class PageIndexStore
implements IndexStore
{
private final ColumnChunkMetaData columnChunkMetadata;
private Optional<ColumnIndex> columnIndex;
private boolean columnIndexRead;
private final Optional<OffsetIndex> offsetIndex;
PageIndexStore(ColumnChunkMetaData meta)
{
this.columnChunkMetadata = meta;
try {
this.offsetIndex = dataSource.readOffsetIndex(meta);
}
catch (IOException e) {
// If the I/O issue still stands it will fail the reading later;
// otherwise we fail the filtering only with a missing offset index.
throw new MissingOffsetIndexException(meta.getPath());
}
}
@Override
public Optional<ColumnIndex> getColumnIndex()
{
if (!columnIndexRead) {
try {
columnIndex = dataSource.readColumnIndex(columnChunkMetadata);
}
catch (IOException e) {
// If the I/O issue still stands it will fail the reading later;
// otherwise we fail the filtering only with a missing column index.
}
columnIndexRead = true;
}
return columnIndex;
}
@Override
public Optional<OffsetIndex> getOffsetIndex()
{
return offsetIndex;
}
}
private static final ParquetColumnIndexStore.IndexStore MISSING_INDEX_STORE = new IndexStore()
{
@Override
public Optional<ColumnIndex> getColumnIndex()
{
return null;
}
@Override
public Optional<OffsetIndex> getOffsetIndex()
{
return null;
}
};
private static final ParquetColumnIndexStore EMPTY = new ParquetColumnIndexStore(null, new BlockMetaData(), emptySet())
{
@Override
public ColumnIndex getColumnIndex(ColumnPath column)
{
return null;
}
@Override
public OffsetIndex getOffsetIndex(ColumnPath column)
{
throw new MissingOffsetIndexException(column);
}
};
private final ParquetDataSource dataSource;
private final Map<ColumnPath, ParquetColumnIndexStore.IndexStore> store;
/*
* Creates a column index store which lazily reads column/offset indexes for the columns in paths. (paths are the set
* of columns used for the projection)
*/
public static ColumnIndexStore create(ParquetDataSource dataSource, BlockMetaData block, Set<ColumnPath> paths)
{
try {
return new ParquetColumnIndexStore(dataSource, block, paths);
}
catch (MissingOffsetIndexException e) {
return EMPTY;
}
}
private ParquetColumnIndexStore(ParquetDataSource dataSource, BlockMetaData block, Set<ColumnPath> paths)
{
this.dataSource = dataSource;
Map<ColumnPath, ParquetColumnIndexStore.IndexStore> store = new HashMap<>();
for (ColumnChunkMetaData column : block.getColumns()) {
ColumnPath path = column.getPath();
if (paths.contains(path)) {
store.put(path, new ParquetColumnIndexStore.PageIndexStore(column));
}
}
this.store = store;
}
@Override
public ColumnIndex getColumnIndex(ColumnPath column)
{
return store.getOrDefault(column, MISSING_INDEX_STORE).getColumnIndex().orElse(null);
}
@Override
public OffsetIndex getOffsetIndex(ColumnPath column)
{
return store.getOrDefault(column, MISSING_INDEX_STORE).getOffsetIndex().orElse(null);
}
}