HiveStagingFileCommitter.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.hive.PartitionUpdate.FileWriteInfo;
import com.facebook.presto.spi.ConnectorSession;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import javax.inject.Inject;

import java.util.ArrayList;
import java.util.List;

import static com.facebook.presto.hive.metastore.MetastoreUtil.getFileSystem;
import static com.facebook.presto.hive.metastore.MetastoreUtil.renameFile;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.util.concurrent.Futures.catching;
import static com.google.common.util.concurrent.Futures.whenAllSucceed;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.util.Objects.requireNonNull;

public class HiveStagingFileCommitter
        implements StagingFileCommitter
{
    private final HdfsEnvironment hdfsEnvironment;
    private final ListeningExecutorService fileRenameExecutor;

    @Inject
    public HiveStagingFileCommitter(
            HdfsEnvironment hdfsEnvironment,
            @ForFileRename ListeningExecutorService fileRenameExecutor)
    {
        this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
        this.fileRenameExecutor = requireNonNull(fileRenameExecutor, "fileRenameExecutor is null");
    }

    @Override
    public ListenableFuture<Void> commitFiles(
            ConnectorSession session,
            String schemaName,
            String tableName,
            String tablePath,
            boolean isCreateTable,
            List<PartitionUpdate> partitionUpdates)
    {
        HdfsContext context = new HdfsContext(session, schemaName, tableName, tablePath, isCreateTable);
        List<ListenableFuture<Void>> commitFutures = new ArrayList<>();

        for (PartitionUpdate partitionUpdate : partitionUpdates) {
            Path path = partitionUpdate.getWritePath();
            FileSystem fileSystem = getFileSystem(hdfsEnvironment, context, path);
            for (FileWriteInfo fileWriteInfo : partitionUpdate.getFileWriteInfos()) {
                checkState(!fileWriteInfo.getWriteFileName().equals(fileWriteInfo.getTargetFileName()));
                Path source = new Path(path, fileWriteInfo.getWriteFileName());
                Path target = new Path(path, fileWriteInfo.getTargetFileName());
                commitFutures.add(fileRenameExecutor.submit(() -> {
                    renameFile(fileSystem, source, target);
                    return null;
                }));
            }
        }

        ListenableFuture<Void> result = whenAllSucceed(commitFutures).call(() -> null, directExecutor());
        return catching(
                result,
                RuntimeException.class,
                e -> {
                    checkState(e != null, "Null exception is caught during commitFiles");
                    result.cancel(true);
                    throw e;
                },
                directExecutor());
    }
}