ManifestPartitionLoader.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.hive.filesystem.ExtendedFileSystem;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.util.InternalHiveSplitFactory;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputFormat;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;

import static com.facebook.presto.hive.HiveCommonSessionProperties.getNodeSelectionStrategy;
import static com.facebook.presto.hive.HiveErrorCode.MALFORMED_HIVE_FILE_STATISTICS;
import static com.facebook.presto.hive.HiveManifestUtils.FILE_NAMES;
import static com.facebook.presto.hive.HiveManifestUtils.FILE_SIZES;
import static com.facebook.presto.hive.HiveManifestUtils.MANIFEST_VERSION;
import static com.facebook.presto.hive.HiveManifestUtils.VERSION_1;
import static com.facebook.presto.hive.HiveManifestUtils.decompressFileNames;
import static com.facebook.presto.hive.HiveManifestUtils.decompressFileSizes;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxInitialSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.getMaxSplitSize;
import static com.facebook.presto.hive.HiveSessionProperties.isManifestVerificationEnabled;
import static com.facebook.presto.hive.HiveSessionProperties.isSkipEmptyFilesEnabled;
import static com.facebook.presto.hive.HiveUtil.buildDirectoryContextProperties;
import static com.facebook.presto.hive.HiveUtil.getInputFormat;
import static com.facebook.presto.hive.NestedDirectoryPolicy.IGNORED;
import static com.facebook.presto.hive.NestedDirectoryPolicy.RECURSE;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getPartitionLocation;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class ManifestPartitionLoader
        extends PartitionLoader
{
    // The following constants are referred from FileSystem.getFileBlockLocations in Hadoop
    private static final String[] BLOCK_LOCATION_NAMES = {"localhost:50010"};
    private static final String[] BLOCK_LOCATION_HOSTS = {"localhost"};

    private final Table table;
    private final Map<Integer, Domain> infoColumnConstraints;
    private final ConnectorSession session;
    private final HdfsEnvironment hdfsEnvironment;
    private final HdfsContext hdfsContext;
    private final NamenodeStats namenodeStats;
    private final DirectoryLister directoryLister;
    private final boolean recursiveDirWalkerEnabled;
    private final boolean schedulerUsesHostAddresses;

    public ManifestPartitionLoader(
            Table table,
            Map<Integer, Domain> infoColumnConstraints,
            ConnectorSession session,
            HdfsEnvironment hdfsEnvironment,
            NamenodeStats namenodeStats,
            DirectoryLister directoryLister,
            boolean recursiveDirWalkerEnabled,
            boolean schedulerUsesHostAddresses)
    {
        this.table = requireNonNull(table, "table is null");
        this.infoColumnConstraints = requireNonNull(infoColumnConstraints, "infoColumnConstraints is null");
        this.session = requireNonNull(session, "session is null");
        this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
        this.hdfsContext = new HdfsContext(session, table.getDatabaseName(), table.getTableName(), table.getStorage().getLocation(), false);
        this.namenodeStats = requireNonNull(namenodeStats, "namenodeStats is null");
        this.directoryLister = requireNonNull(directoryLister, "directoryLister is null");
        this.recursiveDirWalkerEnabled = recursiveDirWalkerEnabled;
        this.schedulerUsesHostAddresses = schedulerUsesHostAddresses;
    }

    public ListenableFuture<?> loadPartition(HivePartitionMetadata partition, HiveSplitSource hiveSplitSource, boolean stopped)
            throws IOException
    {
        Path path = new Path(getPartitionLocation(table, partition.getPartition()));
        Map<String, String> parameters = partition.getPartition().get().getParameters();

        // TODO: Add support for more manifest versions
        // Verify the manifest version
        verify(VERSION_1.equals(parameters.get(MANIFEST_VERSION)), "Manifest version is not equal to %s", VERSION_1);

        List<String> fileNames = decompressFileNames(parameters.get(FILE_NAMES));
        List<Long> fileSizes = decompressFileSizes(parameters.get(FILE_SIZES));

        // Verify that the count of fileNames and fileSizes are same
        verify(fileNames.size() == fileSizes.size(), "List of fileNames and fileSizes differ in length");

        if (isManifestVerificationEnabled(session)) {
            // Verify that the file names and sizes in manifest are the same as listed by directory lister
            validateManifest(session, partition, path, fileNames, fileSizes);
        }

        ImmutableList.Builder<HiveFileInfo> fileListBuilder = ImmutableList.builder();
        for (int i = 0; i < fileNames.size(); i++) {
            Path filePath = new Path(path, fileNames.get(i));
            FileStatus fileStatus = new FileStatus(fileSizes.get(i), false, 1, getMaxSplitSize(session).toBytes(), 0, filePath);
            try {
                BlockLocation[] locations = {new BlockLocation(BLOCK_LOCATION_NAMES, BLOCK_LOCATION_HOSTS, 0, fileSizes.get(i))};

                // It is safe to set extraFileContext as empty because downstream code always checks if its present before proceeding.
                fileListBuilder.add(HiveFileInfo.createHiveFileInfo(new LocatedFileStatus(fileStatus, locations), Optional.empty()));
            }
            catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        }

        InternalHiveSplitFactory splitFactory = createInternalHiveSplitFactory(table, partition, session, infoColumnConstraints, hdfsEnvironment, hdfsContext, schedulerUsesHostAddresses);

        return hiveSplitSource.addToQueue(fileListBuilder.build().stream()
                .map(status -> splitFactory.createInternalHiveSplit(status, true))
                .filter(Optional::isPresent)
                .map(Optional::get)
                .collect(toImmutableList()));
    }

    private InternalHiveSplitFactory createInternalHiveSplitFactory(
            Table table,
            HivePartitionMetadata partition,
            ConnectorSession session,
            Map<Integer, Domain> infoColumnConstraints,
            HdfsEnvironment hdfsEnvironment,
            HdfsContext hdfsContext,
            boolean schedulerUsesHostAddresses)
            throws IOException
    {
        String partitionName = partition.getHivePartition().getPartitionId().getPartitionName();
        Storage storage = partition.getPartition().map(Partition::getStorage).orElse(table.getStorage());
        String inputFormatName = storage.getStorageFormat().getInputFormat();
        String serDe = storage.getStorageFormat().getSerDe();
        int partitionDataColumnCount = partition.getPartition()
                .map(p -> p.getColumns().size())
                .orElseGet(table.getDataColumns()::size);
        List<HivePartitionKey> partitionKeys = getPartitionKeys(table, partition.getPartition(), partitionName);
        String location = getPartitionLocation(table, partition.getPartition());
        Path path = new Path(location);
        Configuration configuration = hdfsEnvironment.getConfiguration(hdfsContext, path);
        InputFormat<?, ?> inputFormat = getInputFormat(configuration, inputFormatName, serDe, false);
        ExtendedFileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path);

        return new InternalHiveSplitFactory(
                fileSystem,
                inputFormat,
                infoColumnConstraints,
                getNodeSelectionStrategy(session),
                getMaxInitialSplitSize(session),
                false,
                new HiveSplitPartitionInfo(
                        storage,
                        location,
                        partitionKeys,
                        partitionName,
                        partitionDataColumnCount,
                        partition.getTableToPartitionMapping(),
                        Optional.empty(),
                        partition.getRedundantColumnDomains(),
                        partition.getRowIdPartitionComponent()),
                schedulerUsesHostAddresses,
                partition.getEncryptionInformation());
    }

    private void validateManifest(ConnectorSession session, HivePartitionMetadata partition, Path path, List<String> manifestFileNames, List<Long> manifestFileSizes)
            throws IOException
    {
        ExtendedFileSystem fileSystem = hdfsEnvironment.getFileSystem(hdfsContext, path);
        HiveDirectoryContext hiveDirectoryContext = new HiveDirectoryContext(
                recursiveDirWalkerEnabled ? RECURSE : IGNORED,
                false,
                isSkipEmptyFilesEnabled(session),
                hdfsContext.getIdentity(),
                buildDirectoryContextProperties(session),
                session.getRuntimeStats());

        Iterator<HiveFileInfo> fileInfoIterator = directoryLister.list(fileSystem, table, path, partition.getPartition(), namenodeStats, hiveDirectoryContext);
        int fileCount = 0;
        while (fileInfoIterator.hasNext()) {
            HiveFileInfo fileInfo = fileInfoIterator.next();
            String fileName = fileInfo.getFileName();
            if (!manifestFileNames.contains(fileName)) {
                throw new PrestoException(
                        MALFORMED_HIVE_FILE_STATISTICS,
                        format("Filename = %s not stored in manifest. Partition = %s, TableName = %s",
                                fileName,
                                partition.getHivePartition().getPartitionId(),
                                table.getTableName()));
            }

            int index = manifestFileNames.indexOf(fileName);
            if (!manifestFileSizes.get(index).equals(fileInfo.getLength())) {
                throw new PrestoException(
                        MALFORMED_HIVE_FILE_STATISTICS,
                        format("FilesizeFromManifest = %s is not equal to FilesizeFromStorage = %s. File = %s, Partition = %s, TableName = %s",
                                manifestFileSizes.get(index),
                                fileInfo.getLength(),
                                fileName,
                                partition.getHivePartition().getPartitionId(),
                                table.getTableName()));
            }

            fileCount++;
        }

        if (fileCount != manifestFileNames.size()) {
            throw new PrestoException(
                    MALFORMED_HIVE_FILE_STATISTICS,
                    format("Number of files in Manifest = %s is not equal to Number of files in storage = %s. Partition = %s, TableName = %s",
                            manifestFileNames.size(),
                            fileCount,
                            partition.getHivePartition().getPartitionId(),
                            table.getTableName()));
        }
    }
}