SemiTransactionalHiveMetastore.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive.metastore;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.common.predicate.Domain;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.hive.ColumnConverterProvider;
import com.facebook.presto.hive.HdfsContext;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveTableHandle;
import com.facebook.presto.hive.HiveType;
import com.facebook.presto.hive.LocationHandle.WriteMode;
import com.facebook.presto.hive.PartitionNameWithVersion;
import com.facebook.presto.hive.PartitionNotFoundException;
import com.facebook.presto.hive.TableAlreadyExistsException;
import com.facebook.presto.spi.ConnectorSession;
import com.facebook.presto.spi.ErrorCodeSupplier;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.StandardErrorCode;
import com.facebook.presto.spi.TableNotFoundException;
import com.facebook.presto.spi.connector.ConnectorCommitHandle;
import com.facebook.presto.spi.constraints.TableConstraint;
import com.facebook.presto.spi.security.PrestoPrincipal;
import com.facebook.presto.spi.security.PrincipalType;
import com.facebook.presto.spi.security.RoleGrant;
import com.facebook.presto.spi.statistics.ColumnStatisticType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.units.Duration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import javax.annotation.concurrent.GuardedBy;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.facebook.airlift.concurrent.MoreFutures.getFutureValue;
import static com.facebook.presto.common.ErrorType.USER_ERROR;
import static com.facebook.presto.common.RuntimeMetricName.GET_PARTITIONS_BY_NAMES_TIME_NANOS;
import static com.facebook.presto.common.RuntimeMetricName.GET_TABLE_TIME_NANOS;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_CORRUPTED_COLUMN_STATISTICS;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_FILESYSTEM_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_METASTORE_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_METASTORE_USER_ERROR;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_PATH_ALREADY_EXISTS;
import static com.facebook.presto.hive.HiveErrorCode.HIVE_TABLE_DROPPED_DURING_QUERY;
import static com.facebook.presto.hive.HiveStatisticsUtil.updatePartitionStatistics;
import static com.facebook.presto.hive.LocationHandle.WriteMode.DIRECT_TO_TARGET_NEW_DIRECTORY;
import static com.facebook.presto.hive.metastore.HiveCommitHandle.EMPTY_HIVE_COMMIT_HANDLE;
import static com.facebook.presto.hive.metastore.HivePrivilegeInfo.HivePrivilege.OWNERSHIP;
import static com.facebook.presto.hive.metastore.MetastoreUtil.PRESTO_QUERY_ID_NAME;
import static com.facebook.presto.hive.metastore.MetastoreUtil.convertPredicateToParts;
import static com.facebook.presto.hive.metastore.MetastoreUtil.createDirectory;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getFileSystem;
import static com.facebook.presto.hive.metastore.MetastoreUtil.getMetastoreHeaders;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isPrestoView;
import static com.facebook.presto.hive.metastore.MetastoreUtil.isUserDefinedTypeEncodingEnabled;
import static com.facebook.presto.hive.metastore.MetastoreUtil.pathExists;
import static com.facebook.presto.hive.metastore.MetastoreUtil.renameFile;
import static com.facebook.presto.hive.metastore.MetastoreUtil.toPartitionValues;
import static com.facebook.presto.hive.metastore.PrestoTableType.MANAGED_TABLE;
import static com.facebook.presto.hive.metastore.PrestoTableType.MATERIALIZED_VIEW;
import static com.facebook.presto.hive.metastore.PrestoTableType.TEMPORARY_TABLE;
import static com.facebook.presto.hive.metastore.PrestoTableType.VIRTUAL_VIEW;
import static com.facebook.presto.hive.metastore.Statistics.ReduceOperator.SUBTRACT;
import static com.facebook.presto.hive.metastore.Statistics.merge;
import static com.facebook.presto.hive.metastore.Statistics.reduce;
import static com.facebook.presto.spi.StandardErrorCode.ALREADY_EXISTS;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.StandardErrorCode.TRANSACTION_CONFLICT;
import static com.facebook.presto.spi.WarningCollector.NOOP;
import static com.facebook.presto.spi.security.PrincipalType.USER;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Verify.verify;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.util.concurrent.Futures.whenAllSucceed;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hive.common.FileUtils.makePartName;

public class SemiTransactionalHiveMetastore
{
    private static final Logger log = Logger.get(SemiTransactionalHiveMetastore.class);
    private static final int MAX_LAST_DATA_COMMIT_TIME_ENTRY_PER_TABLE = 100;
    private static final int MAX_LAST_DATA_COMMIT_TIME_ENTRY_PER_TRANSACTION = 10_000;

    private final ExtendedHiveMetastore delegate;
    private final HdfsEnvironment hdfsEnvironment;
    private final ListeningExecutorService renameExecutor;
    private final ColumnConverterProvider columnConverterProvider;
    private final boolean skipDeletionForAlter;
    private final boolean skipTargetCleanupOnRollback;
    private final boolean undoMetastoreOperationsEnabled;

    // Cache lastDataCommitTime for read queries.
    @GuardedBy("this")
    private final Map<SchemaTableName, List<Long>> lastDataCommitTimesForRead = new HashMap<>();

    @GuardedBy("this")
    private final Map<SchemaTableName, Action<TableAndMore>> tableActions = new HashMap<>();
    @GuardedBy("this")
    private final Map<SchemaTableName, Map<List<String>, Action<PartitionAndMore>>> partitionActions = new HashMap<>();
    @GuardedBy("this")
    private final List<DeclaredIntentionToWrite> declaredIntentionsToWrite = new ArrayList<>();
    @GuardedBy("this")
    private ExclusiveOperation bufferedExclusiveOperation;
    @GuardedBy("this")
    private State state = State.EMPTY;
    private boolean throwOnCleanupFailure;

    public SemiTransactionalHiveMetastore(
            HdfsEnvironment hdfsEnvironment,
            ExtendedHiveMetastore delegate,
            ListeningExecutorService renameExecutor,
            boolean skipDeletionForAlter,
            boolean skipTargetCleanupOnRollback,
            boolean undoMetastoreOperationsEnabled,
            ColumnConverterProvider columnConverterProvider)
    {
        this.hdfsEnvironment = requireNonNull(hdfsEnvironment, "hdfsEnvironment is null");
        this.delegate = requireNonNull(delegate, "delegate is null");
        this.renameExecutor = requireNonNull(renameExecutor, "renameExecutor is null");
        this.columnConverterProvider = requireNonNull(columnConverterProvider, "columnConverterProvider is null");
        this.skipDeletionForAlter = skipDeletionForAlter;
        this.skipTargetCleanupOnRollback = skipTargetCleanupOnRollback;
        this.undoMetastoreOperationsEnabled = undoMetastoreOperationsEnabled;
    }

    public ColumnConverterProvider getColumnConverterProvider()
    {
        return columnConverterProvider;
    }

    public synchronized List<String> getAllDatabases(MetastoreContext metastoreContext)
    {
        checkReadable();
        return delegate.getAllDatabases(metastoreContext);
    }

    public synchronized Optional<Database> getDatabase(MetastoreContext metastoreContext, String databaseName)
    {
        checkReadable();
        return delegate.getDatabase(metastoreContext, databaseName);
    }

    public synchronized Optional<List<String>> getAllTables(MetastoreContext metastoreContext, String databaseName)
    {
        checkReadable();
        if (!tableActions.isEmpty()) {
            throw new UnsupportedOperationException("Listing all tables after adding/dropping/altering tables/views in a transaction is not supported");
        }
        return delegate.getAllTables(metastoreContext, databaseName);
    }

    public Optional<Table> getTable(MetastoreContext metastoreContext, String databaseName, String tableName)
    {
        HiveTableHandle hiveTableHandle = new HiveTableHandle(databaseName, tableName);
        return getTable(metastoreContext, hiveTableHandle);
    }

    public synchronized Optional<Table> getTable(MetastoreContext metastoreContext, HiveTableHandle hiveTableHandle)
    {
        checkReadable();
        Action<TableAndMore> tableAction = tableActions.get(hiveTableHandle.getSchemaTableName());
        if (tableAction == null) {
            return metastoreContext.getRuntimeStats().recordWallTime(GET_TABLE_TIME_NANOS, () -> delegate.getTable(metastoreContext, hiveTableHandle));
        }
        switch (tableAction.getType()) {
            case ADD:
            case ALTER:
            case INSERT_EXISTING:
                return Optional.of(tableAction.getData().getAugmentedTableForInTransactionRead());
            case DROP:
                return Optional.empty();
            default:
                throw new IllegalStateException("Unknown action type");
        }
    }

    public synchronized List<TableConstraint<String>> getTableConstraints(MetastoreContext metastoreContext, String databaseName, String tableName)
    {
        checkReadable();
        Action<TableAndMore> tableAction = tableActions.get(new SchemaTableName(databaseName, tableName));
        if (tableAction == null) {
            return delegate.getTableConstraints(metastoreContext, databaseName, tableName);
        }
        return emptyList();
    }

    public synchronized Set<ColumnStatisticType> getSupportedColumnStatistics(MetastoreContext metastoreContext, Type type)
    {
        return delegate.getSupportedColumnStatistics(metastoreContext, type);
    }

    public synchronized PartitionStatistics getTableStatistics(MetastoreContext metastoreContext, String databaseName, String tableName)
    {
        checkReadable();
        Action<TableAndMore> tableAction = tableActions.get(new SchemaTableName(databaseName, tableName));
        if (tableAction == null) {
            return delegate.getTableStatistics(metastoreContext, databaseName, tableName);
        }
        switch (tableAction.getType()) {
            case ADD:
            case ALTER:
            case INSERT_EXISTING:
                return tableAction.getData().getStatistics();
            case DROP:
                return PartitionStatistics.empty();
            default:
                throw new IllegalStateException("Unknown action type");
        }
    }

    public synchronized Map<String, PartitionStatistics> getPartitionStatistics(MetastoreContext metastoreContext, String databaseName, String tableName, Set<String> partitionNames)
    {
        checkReadable();
        Optional<Table> table = getTable(metastoreContext, databaseName, tableName);
        if (!table.isPresent()) {
            return ImmutableMap.of();
        }
        TableSource tableSource = getTableSource(databaseName, tableName);
        Map<List<String>, Action<PartitionAndMore>> partitionActionsOfTable = partitionActions.computeIfAbsent(table.get().getSchemaTableName(), k -> new HashMap<>());
        ImmutableSet.Builder<String> partitionNamesToQuery = ImmutableSet.builder();
        ImmutableMap.Builder<String, PartitionStatistics> resultBuilder = ImmutableMap.builder();
        for (String partitionName : partitionNames) {
            List<String> partitionValues = toPartitionValues(partitionName);
            Action<PartitionAndMore> partitionAction = partitionActionsOfTable.get(partitionValues);
            if (partitionAction == null) {
                switch (tableSource) {
                    case PRE_EXISTING_TABLE:
                        partitionNamesToQuery.add(partitionName);
                        break;
                    case CREATED_IN_THIS_TRANSACTION:
                        resultBuilder.put(partitionName, PartitionStatistics.empty());
                        break;
                    default:
                        throw new UnsupportedOperationException("unknown table source");
                }
            }
            else {
                resultBuilder.put(partitionName, partitionAction.getData().getStatistics());
            }
        }

        Map<String, PartitionStatistics> delegateResult = delegate.getPartitionStatistics(metastoreContext, databaseName, tableName, partitionNamesToQuery.build());
        if (!delegateResult.isEmpty()) {
            resultBuilder.putAll(delegateResult);
        }
        else {
            partitionNamesToQuery.build().forEach(partitionName -> resultBuilder.put(partitionName, PartitionStatistics.empty()));
        }
        return resultBuilder.build();
    }

    /**
     * This method can only be called when the table is known to exist
     */
    @GuardedBy("this")
    private TableSource getTableSource(String databaseName, String tableName)
    {
        checkHoldsLock();

        checkReadable();
        Action<TableAndMore> tableAction = tableActions.get(new SchemaTableName(databaseName, tableName));
        if (tableAction == null) {
            return TableSource.PRE_EXISTING_TABLE;
        }
        switch (tableAction.getType()) {
            case ADD:
                return TableSource.CREATED_IN_THIS_TRANSACTION;
            case ALTER:
                throw new IllegalStateException("Tables are never altered in the current implementation");
            case DROP:
                throw new TableNotFoundException(new SchemaTableName(databaseName, tableName));
            case INSERT_EXISTING:
                return TableSource.PRE_EXISTING_TABLE;
            default:
                throw new IllegalStateException("Unknown action type");
        }
    }

    public synchronized HivePageSinkMetadata generatePageSinkMetadata(MetastoreContext metastoreContext, SchemaTableName schemaTableName)
    {
        checkReadable();
        Optional<Table> table = getTable(metastoreContext, schemaTableName.getSchemaName(), schemaTableName.getTableName());
        if (!table.isPresent()) {
            return new HivePageSinkMetadata(schemaTableName, Optional.empty(), ImmutableMap.of());
        }
        Map<List<String>, Action<PartitionAndMore>> partitionActionMap = partitionActions.get(schemaTableName);
        Map<List<String>, Optional<Partition>> modifiedPartitionMap;
        if (partitionActionMap == null) {
            modifiedPartitionMap = ImmutableMap.of();
        }
        else {
            ImmutableMap.Builder<List<String>, Optional<Partition>> modifiedPartitionMapBuilder = ImmutableMap.builder();
            for (Map.Entry<List<String>, Action<PartitionAndMore>> entry : partitionActionMap.entrySet()) {
                modifiedPartitionMapBuilder.put(entry.getKey(), getPartitionFromPartitionAction(entry.getValue()));
            }
            modifiedPartitionMap = modifiedPartitionMapBuilder.build();
        }
        return new HivePageSinkMetadata(
                schemaTableName,
                table,
                modifiedPartitionMap);
    }

    public synchronized Optional<List<String>> getAllViews(MetastoreContext metastoreContext, String databaseName)
    {
        checkReadable();
        if (!tableActions.isEmpty()) {
            throw new UnsupportedOperationException("Listing all tables after adding/dropping/altering tables/views in a transaction is not supported");
        }
        return delegate.getAllViews(metastoreContext, databaseName);
    }

    public synchronized void createDatabase(MetastoreContext metastoreContext, Database database)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            delegate.createDatabase(metastoreContext, database);
            return EMPTY_HIVE_COMMIT_HANDLE;
        });
    }

    public synchronized void dropDatabase(MetastoreContext metastoreContext, String schemaName)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            delegate.dropDatabase(metastoreContext, schemaName);
            return EMPTY_HIVE_COMMIT_HANDLE;
        });
    }

    public synchronized void renameDatabase(MetastoreContext metastoreContext, String source, String target)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            delegate.renameDatabase(metastoreContext, source, target);
            return EMPTY_HIVE_COMMIT_HANDLE;
        });
    }

    // TODO: Allow updating statistics for 2 tables in the same transaction
    public synchronized void setTableStatistics(MetastoreContext metastoreContext, Table table, PartitionStatistics tableStatistics)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            delegate.updateTableStatistics(
                    metastoreContext, table.getDatabaseName(),
                    table.getTableName(),
                    statistics -> updatePartitionStatistics(statistics, tableStatistics));
            return EMPTY_HIVE_COMMIT_HANDLE;
        });
    }

    // TODO: Allow updating statistics for 2 tables in the same transaction
    public synchronized void setPartitionStatistics(MetastoreContext metastoreContext, Table table, Map<List<String>, PartitionStatistics> partitionStatisticsMap)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            partitionStatisticsMap.forEach((partitionValues, newPartitionStats) ->
                    delegate.updatePartitionStatistics(
                            metastoreContext,
                            table.getDatabaseName(),
                            table.getTableName(),
                            getPartitionName(table, partitionValues),
                            oldPartitionStats -> updatePartitionStatistics(oldPartitionStats, newPartitionStats)));
            return EMPTY_HIVE_COMMIT_HANDLE;
        });
    }

    /**
     * {@code currentLocation} needs to be supplied if a writePath exists for the table.
     */
    public synchronized void createTable(
            ConnectorSession session,
            Table table,
            PrincipalPrivileges principalPrivileges,
            Optional<Path> currentPath,
            boolean ignoreExisting,
            PartitionStatistics statistics,
            List<TableConstraint<String>> constraints)
    {
        setShared();
        // When creating a table, it should never have partition actions. This is just a sanity check.
        checkNoPartitionAction(table.getDatabaseName(), table.getTableName());
        Action<TableAndMore> oldTableAction = tableActions.get(table.getSchemaTableName());
        TableAndMore tableAndMore = new TableAndMore(table, Optional.of(principalPrivileges), currentPath, Optional.empty(), ignoreExisting, statistics, statistics, constraints);
        if (oldTableAction == null) {
            HdfsContext context = new HdfsContext(session, table.getDatabaseName(), table.getTableName(), table.getStorage().getLocation(), true);
            tableActions.put(table.getSchemaTableName(), new Action<>(ActionType.ADD, tableAndMore, context));
            return;
        }
        switch (oldTableAction.getType()) {
            case DROP:
                throw new PrestoException(TRANSACTION_CONFLICT, "Dropping and then recreating the same table in a transaction is not supported");
            case ADD:
            case ALTER:
            case INSERT_EXISTING:
                throw new TableAlreadyExistsException(table.getSchemaTableName());
            default:
                throw new IllegalStateException("Unknown action type");
        }
    }

    public synchronized void dropTable(HdfsContext context, String databaseName, String tableName)
    {
        setShared();
        // Dropping table with partition actions requires cleaning up staging data, which is not implemented yet.
        checkNoPartitionAction(databaseName, tableName);
        SchemaTableName schemaTableName = new SchemaTableName(databaseName, tableName);
        Action<TableAndMore> oldTableAction = tableActions.get(schemaTableName);
        if (oldTableAction == null || oldTableAction.getType() == ActionType.ALTER) {
            tableActions.put(schemaTableName, new Action<>(ActionType.DROP, null, context));
            return;
        }
        switch (oldTableAction.getType()) {
            case DROP:
                throw new TableNotFoundException(schemaTableName);
            case ADD:
            case ALTER:
            case INSERT_EXISTING:
                throw new UnsupportedOperationException("dropping a table added/modified in the same transaction is not supported");
            default:
                throw new IllegalStateException("Unknown action type");
        }
    }

    public synchronized void replaceView(MetastoreContext metastoreContext, String databaseName, String tableName, Table table, PrincipalPrivileges principalPrivileges)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            MetastoreOperationResult operationResult = delegate.replaceTable(metastoreContext, databaseName, tableName, table, principalPrivileges);
            return buildCommitHandle(new SchemaTableName(databaseName, tableName), operationResult);
        });
    }

    public synchronized void renameTable(MetastoreContext metastoreContext, String databaseName, String tableName, String newDatabaseName, String newTableName)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            MetastoreOperationResult operationResult = delegate.renameTable(metastoreContext, databaseName, tableName, newDatabaseName, newTableName);
            return buildCommitHandle(new SchemaTableName(databaseName, tableName), operationResult);
        });
    }

    public synchronized void addColumn(MetastoreContext metastoreContext, String databaseName, String tableName, String columnName, HiveType columnType, String columnComment)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            MetastoreOperationResult operationResult = delegate.addColumn(metastoreContext, databaseName, tableName, columnName, columnType, columnComment);
            return buildCommitHandle(new SchemaTableName(databaseName, tableName), operationResult);
        });
    }

    public synchronized void renameColumn(MetastoreContext metastoreContext, String databaseName, String tableName, String oldColumnName, String newColumnName)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            MetastoreOperationResult operationResult = delegate.renameColumn(metastoreContext, databaseName, tableName, oldColumnName, newColumnName);
            return buildCommitHandle(new SchemaTableName(databaseName, tableName), operationResult);
        });
    }

    public synchronized void dropColumn(MetastoreContext metastoreContext, String databaseName, String tableName, String columnName)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            MetastoreOperationResult operationResult = delegate.dropColumn(metastoreContext, databaseName, tableName, columnName);
            return buildCommitHandle(new SchemaTableName(databaseName, tableName), operationResult);
        });
    }

    private ConnectorCommitHandle buildCommitHandle(SchemaTableName table, MetastoreOperationResult operationStats)
    {
        Map<SchemaTableName, List<Long>> lastDataCommitTimes = ImmutableMap.of(table, operationStats.getLastDataCommitTimes());
        return new HiveCommitHandle(ImmutableMap.of(), lastDataCommitTimes);
    }

    public synchronized void finishInsertIntoExistingTable(
            ConnectorSession session,
            String databaseName,
            String tableName,
            Path currentLocation,
            List<String> fileNames,
            PartitionStatistics statisticsUpdate)
    {
        // Data can only be inserted into partitions and unpartitioned tables. They can never be inserted into a partitioned table.
        // Therefore, this method assumes that the table is unpartitioned.
        setShared();
        SchemaTableName schemaTableName = new SchemaTableName(databaseName, tableName);
        Action<TableAndMore> oldTableAction = tableActions.get(schemaTableName);
        MetastoreContext metastoreContext = new MetastoreContext(
                session.getIdentity(),
                session.getQueryId(),
                session.getClientInfo(),
                session.getClientTags(),
                session.getSource(),
                getMetastoreHeaders(session),
                isUserDefinedTypeEncodingEnabled(session),
                columnConverterProvider,
                session.getWarningCollector(),
                session.getRuntimeStats());
        if (oldTableAction == null || oldTableAction.getData().getTable().getTableType().equals(TEMPORARY_TABLE)) {
            Table table = getTable(metastoreContext, databaseName, tableName)
                    .orElseThrow(() -> new TableNotFoundException(schemaTableName));

            PartitionStatistics currentStatistics = getTableStatistics(metastoreContext, databaseName, tableName);
            HdfsContext context = new HdfsContext(session, databaseName, tableName, table.getStorage().getLocation(), false);
            tableActions.put(
                    schemaTableName,
                    new Action<>(
                            ActionType.INSERT_EXISTING,
                            new TableAndMore(
                                    table,
                                    Optional.empty(),
                                    Optional.of(currentLocation),
                                    Optional.of(fileNames),
                                    false,
                                    merge(currentStatistics, statisticsUpdate),
                                    statisticsUpdate,
                                    emptyList()),
                            context));
            return;
        }

        switch (oldTableAction.getType()) {
            case DROP:
                throw new TableNotFoundException(schemaTableName);
            case ADD:
            case ALTER:
            case INSERT_EXISTING:
                throw new UnsupportedOperationException("Inserting into an unpartitioned table that were added, altered, or inserted into in the same transaction is not supported");
            default:
                throw new IllegalStateException("Unknown action type");
        }
    }

    public synchronized void truncateUnpartitionedTable(ConnectorSession session, String databaseName, String tableName)
    {
        checkReadable();
        Optional<Table> table = getTable(
                new MetastoreContext(
                        session.getIdentity(),
                        session.getQueryId(),
                        session.getClientInfo(),
                        session.getClientTags(),
                        session.getSource(),
                        getMetastoreHeaders(session),
                        isUserDefinedTypeEncodingEnabled(session),
                        columnConverterProvider,
                        session.getWarningCollector(),
                        session.getRuntimeStats()),
                databaseName,
                tableName);
        SchemaTableName schemaTableName = new SchemaTableName(databaseName, tableName);
        if (!table.isPresent()) {
            throw new TableNotFoundException(schemaTableName);
        }
        if (!table.get().getTableType().equals(MANAGED_TABLE) && !table.get().getTableType().equals(MATERIALIZED_VIEW)) {
            throw new PrestoException(NOT_SUPPORTED, "Cannot delete from non-managed Hive table");
        }
        if (!table.get().getPartitionColumns().isEmpty()) {
            throw new IllegalArgumentException("Table is partitioned");
        }

        Path path = new Path(table.get().getStorage().getLocation());
        HdfsContext context = new HdfsContext(session, databaseName, tableName, table.get().getStorage().getLocation(), false);
        setExclusive((delegate, hdfsEnvironment) -> {
            RecursiveDeleteResult recursiveDeleteResult = recursiveDeleteFiles(hdfsEnvironment, context, path, ImmutableSet.of(""), false);
            if (!recursiveDeleteResult.getNotDeletedEligibleItems().isEmpty()) {
                throw new PrestoException(HIVE_FILESYSTEM_ERROR, format(
                        "Error deleting from unpartitioned table %s. These items can not be deleted: %s",
                        schemaTableName,
                        recursiveDeleteResult.getNotDeletedEligibleItems()));
            }
            return EMPTY_HIVE_COMMIT_HANDLE;
        });
    }

    public Optional<List<PartitionNameWithVersion>> getPartitionNames(MetastoreContext metastoreContext, String databaseName, String tableName)
    {
        HiveTableHandle hiveTableHandle = new HiveTableHandle(databaseName, tableName);
        return getPartitionNames(metastoreContext, hiveTableHandle);
    }

    public synchronized Optional<List<PartitionNameWithVersion>> getPartitionNames(MetastoreContext metastoreContext, HiveTableHandle hiveTableHandle)
    {
        return doGetPartitionNames(metastoreContext, hiveTableHandle, ImmutableMap.of());
    }

    public synchronized Optional<List<PartitionNameWithVersion>> getPartitionNamesByFilter(MetastoreContext metastoreContext, HiveTableHandle hiveTableHandle, Map<Column, Domain> effectivePredicate)
    {
        return doGetPartitionNames(metastoreContext, hiveTableHandle, effectivePredicate);
    }

    @GuardedBy("this")
    private Optional<List<PartitionNameWithVersion>> doGetPartitionNames(
            MetastoreContext metastoreContext,
            HiveTableHandle hiveTableHandle,
            Map<Column, Domain> partitionPredicates)
    {
        checkHoldsLock();

        checkReadable();
        Optional<Table> table = getTable(metastoreContext, hiveTableHandle);
        if (!table.isPresent()) {
            return Optional.empty();
        }
        List<PartitionNameWithVersion> partitionNames;
        TableSource tableSource = getTableSource(hiveTableHandle.getSchemaName(), hiveTableHandle.getTableName());
        switch (tableSource) {
            case CREATED_IN_THIS_TRANSACTION:
                partitionNames = ImmutableList.of();
                break;
            case PRE_EXISTING_TABLE: {
                Optional<List<PartitionNameWithVersion>> partitionNameResult;
                if (!partitionPredicates.isEmpty()) {
                    partitionNameResult = Optional.of(delegate.getPartitionNamesByFilter(metastoreContext, hiveTableHandle.getSchemaName(), hiveTableHandle.getTableName(), partitionPredicates));
                }
                else {
                    partitionNameResult = delegate.getPartitionNames(metastoreContext, hiveTableHandle.getSchemaName(), hiveTableHandle.getTableName());
                }
                if (!partitionNameResult.isPresent()) {
                    throw new PrestoException(TRANSACTION_CONFLICT, format("Table %s.%s was dropped by another transaction", hiveTableHandle.getSchemaName(), hiveTableHandle.getTableName()));
                }
                partitionNames = partitionNameResult.get();
                break;
            }
            default:
                throw new UnsupportedOperationException("Unknown table source");
        }
        Map<List<String>, Action<PartitionAndMore>> partitionActionsOfTable = partitionActions.computeIfAbsent(table.get().getSchemaTableName(), k -> new HashMap<>());
        ImmutableList.Builder<PartitionNameWithVersion> resultBuilder = ImmutableList.builder();
        // alter/remove newly-altered/dropped partitions from the results from underlying metastore
        for (PartitionNameWithVersion partitionNameWithVersion : partitionNames) {
            List<String> partitionValues = toPartitionValues(partitionNameWithVersion.getPartitionName());
            Action<PartitionAndMore> partitionAction = partitionActionsOfTable.get(partitionValues);
            if (partitionAction == null) {
                resultBuilder.add(partitionNameWithVersion);
                continue;
            }
            switch (partitionAction.getType()) {
                case ADD:
                    throw new PrestoException(TRANSACTION_CONFLICT, format("Another transaction created partition %s in table %s.%s",
                            partitionValues, hiveTableHandle.getSchemaName(), hiveTableHandle.getTableName()));
                case DROP:
                    // do nothing
                    break;
                case ALTER:
                case INSERT_EXISTING:
                    resultBuilder.add(partitionNameWithVersion);
                    break;
                default:
                    throw new IllegalStateException("Unknown action type");
            }
        }
        // add newly-added partitions to the results from underlying metastore
        if (!partitionActionsOfTable.isEmpty()) {
            List<Column> partitionColumns = table.get().getPartitionColumns();
            List<String> partitionColumnNames = partitionColumns.stream().map(Column::getName).collect(toList());
            List<String> parts = convertPredicateToParts(partitionPredicates);
            for (Action<PartitionAndMore> partitionAction : partitionActionsOfTable.values()) {
                if (partitionAction.getType() == ActionType.ADD) {
                    List<String> values = partitionAction.getData().getPartition().getValues();
                    if (parts.isEmpty() || partitionValuesMatch(values, parts)) {
                        resultBuilder.add(new PartitionNameWithVersion(makePartName(partitionColumnNames, values), Optional.empty()));
                    }
                }
            }
        }
        return Optional.of(resultBuilder.build());
    }

    private static boolean partitionValuesMatch(List<String> values, List<String> pattern)
    {
        checkArgument(values.size() == pattern.size());
        for (int i = 0; i < values.size(); i++) {
            if (pattern.get(i).isEmpty()) {
                // empty string match everything
                continue;
            }
            if (values.get(i).equals(pattern.get(i))) {
                return false;
            }
        }
        return true;
    }

    public synchronized Optional<Partition> getPartition(MetastoreContext metastoreContext, String databaseName, String tableName, List<String> partitionValues)
    {
        checkReadable();
        TableSource tableSource = getTableSource(databaseName, tableName);
        Map<List<String>, Action<PartitionAndMore>> partitionActionsOfTable = partitionActions.computeIfAbsent(new SchemaTableName(databaseName, tableName), k -> new HashMap<>());
        Action<PartitionAndMore> partitionAction = partitionActionsOfTable.get(partitionValues);
        if (partitionAction != null) {
            return getPartitionFromPartitionAction(partitionAction);
        }
        switch (tableSource) {
            case PRE_EXISTING_TABLE:
                return delegate.getPartition(metastoreContext, databaseName, tableName, partitionValues);
            case CREATED_IN_THIS_TRANSACTION:
                return Optional.empty();
            default:
                throw new UnsupportedOperationException("unknown table source");
        }
    }

    public synchronized Map<String, Optional<Partition>> getPartitionsByNames(MetastoreContext metastoreContext, String databaseName, String tableName, List<PartitionNameWithVersion> partitionNames)
    {
        checkReadable();
        TableSource tableSource = getTableSource(databaseName, tableName);
        Map<List<String>, Action<PartitionAndMore>> partitionActionsOfTable = partitionActions.computeIfAbsent(new SchemaTableName(databaseName, tableName), k -> new HashMap<>());
        ImmutableList.Builder<PartitionNameWithVersion> partitionNamesToQuery = ImmutableList.builder();
        ImmutableMap.Builder<String, Optional<Partition>> resultBuilder = ImmutableMap.builder();
        for (PartitionNameWithVersion partitionNameWithVersion : partitionNames) {
            List<String> partitionValues = toPartitionValues(partitionNameWithVersion.getPartitionName());
            Action<PartitionAndMore> partitionAction = partitionActionsOfTable.get(partitionValues);
            if (partitionAction == null) {
                switch (tableSource) {
                    case PRE_EXISTING_TABLE:
                        partitionNamesToQuery.add(partitionNameWithVersion);
                        break;
                    case CREATED_IN_THIS_TRANSACTION:
                        resultBuilder.put(partitionNameWithVersion.getPartitionName(), Optional.empty());
                        break;
                    default:
                        throw new UnsupportedOperationException("unknown table source");
                }
            }
            else {
                resultBuilder.put(partitionNameWithVersion.getPartitionName(), getPartitionFromPartitionAction(partitionAction));
            }
        }
        Map<String, Optional<Partition>> delegateResult = metastoreContext.getRuntimeStats().recordWallTime(GET_PARTITIONS_BY_NAMES_TIME_NANOS, () -> delegate.getPartitionsByNames(metastoreContext, databaseName, tableName, partitionNamesToQuery.build()));
        resultBuilder.putAll(delegateResult);

        cacheLastDataCommitTimes(delegateResult, databaseName, tableName);

        return resultBuilder.build();
    }

    private synchronized void cacheLastDataCommitTimes(Map<String, Optional<Partition>> existingPartitions, String databaseName, String tableName)
    {
        // Limit the number of entries for each individual table
        List<Long> lastDataCommitTimes = existingPartitions.values().stream()
                .filter(Optional::isPresent)
                .map(partition -> partition.get().getLastDataCommitTime())
                .limit(MAX_LAST_DATA_COMMIT_TIME_ENTRY_PER_TABLE)
                .collect(toImmutableList());

        // Limit the number of entries for the entire transaction
        int capacity = MAX_LAST_DATA_COMMIT_TIME_ENTRY_PER_TRANSACTION - lastDataCommitTimesForRead.size();
        if (capacity >= lastDataCommitTimes.size()) {
            lastDataCommitTimesForRead.put(new SchemaTableName(databaseName, tableName), lastDataCommitTimes);
        }
        else {
            lastDataCommitTimesForRead.put(new SchemaTableName(databaseName, tableName), lastDataCommitTimes.subList(0, capacity));
        }
    }

    public synchronized void setPartitionLeases(MetastoreContext metastoreContext, String databaseName, String tableName, Map<String, String> partitionNameToLocation, Duration leaseDuration)
    {
        checkReadable();
        Table table = getTable(metastoreContext, databaseName, tableName)
                .orElseThrow(() -> new TableNotFoundException(new SchemaTableName(databaseName, tableName)));
        boolean isPartitioned = table.getPartitionColumns() != null && !table.getPartitionColumns().isEmpty();
        if (table.getTableType().equals(MANAGED_TABLE) && isPartitioned && leaseDuration.toMillis() > 0) {
            delegate.setPartitionLeases(metastoreContext, databaseName, tableName, partitionNameToLocation, leaseDuration);
        }
    }

    private static Optional<Partition> getPartitionFromPartitionAction(Action<PartitionAndMore> partitionAction)
    {
        switch (partitionAction.getType()) {
            case ADD:
            case ALTER:
            case INSERT_EXISTING:
                return Optional.of(partitionAction.getData().getAugmentedPartitionForInTransactionRead());
            case DROP:
                return Optional.empty();
            default:
                throw new IllegalStateException("Unknown action type");
        }
    }

    public synchronized void addPartition(
            ConnectorSession session,
            String databaseName,
            String tableName,
            String tablePath,
            boolean isNewTable,
            Partition partition,
            Path currentLocation,
            PartitionStatistics statistics)
    {
        addPartition(session, databaseName, tableName, tablePath, isNewTable, partition, currentLocation, statistics, false);
    }

    /**
     * Add a new partition metadata in metastore
     *
     * @param session Connector level session
     * @param databaseName Name of the schema
     * @param tableName Name of the table
     * @param tablePath Storage location of the table
     * @param isNewTable The new partition is from an existing table or a new table
     * @param partition The new partition object to be added
     * @param currentLocation The path for which the partition is added in the table
     * @param statistics The basic statistics and column statistics for the added partition
     * @param isPathValidationNeeded check metastore file path. True for no check which is enabled by the sync partition code path only
     */
    public synchronized void addPartition(
            ConnectorSession session,
            String databaseName,
            String tableName,
            String tablePath,
            boolean isNewTable,
            Partition partition,
            Path currentLocation,
            PartitionStatistics statistics,
            boolean isPathValidationNeeded)
    {
        setShared();
        checkArgument(getPrestoQueryId(partition).isPresent());
        Map<List<String>, Action<PartitionAndMore>> partitionActionsOfTable = partitionActions.computeIfAbsent(new SchemaTableName(databaseName, tableName), k -> new HashMap<>());
        Action<PartitionAndMore> oldPartitionAction = partitionActionsOfTable.get(partition.getValues());
        HdfsContext context = new HdfsContext(session, databaseName, tableName, tablePath, isNewTable, isPathValidationNeeded);
        if (oldPartitionAction == null) {
            partitionActionsOfTable.put(
                    partition.getValues(),
                    new Action<>(ActionType.ADD, new PartitionAndMore(partition, currentLocation, Optional.empty(), statistics, statistics), context));
            return;
        }
        switch (oldPartitionAction.getType()) {
            case DROP: {
                if (!oldPartitionAction.getContext().getIdentity().getUser().equals(session.getUser())) {
                    throw new PrestoException(TRANSACTION_CONFLICT, "Operation on the same partition with different user in the same transaction is not supported");
                }
                partitionActionsOfTable.put(
                        partition.getValues(),
                        new Action<>(ActionType.ALTER, new PartitionAndMore(partition, currentLocation, Optional.empty(), statistics, statistics), context));
                break;
            }
            case ADD:
            case ALTER:
            case INSERT_EXISTING:
                throw new PrestoException(ALREADY_EXISTS, format("Partition already exists for table '%s.%s': %s", databaseName, tableName, partition.getValues()));
            default:
                throw new IllegalStateException("Unknown action type");
        }
    }

    public synchronized void dropPartition(
            ConnectorSession session,
            String databaseName,
            String tableName,
            String tablePath,
            List<String> partitionValues)
    {
        setShared();
        Map<List<String>, Action<PartitionAndMore>> partitionActionsOfTable = partitionActions.computeIfAbsent(new SchemaTableName(databaseName, tableName), k -> new HashMap<>());
        Action<PartitionAndMore> oldPartitionAction = partitionActionsOfTable.get(partitionValues);
        if (oldPartitionAction == null) {
            HdfsContext context = new HdfsContext(session, databaseName, tableName, tablePath, false);
            partitionActionsOfTable.put(partitionValues, new Action<>(ActionType.DROP, null, context));
            return;
        }
        switch (oldPartitionAction.getType()) {
            case DROP:
                throw new PartitionNotFoundException(new SchemaTableName(databaseName, tableName), partitionValues);
            case ADD:
            case ALTER:
            case INSERT_EXISTING:
                throw new PrestoException(
                        NOT_SUPPORTED,
                        format("dropping a partition added in the same transaction is not supported: %s %s %s", databaseName, tableName, partitionValues));
            default:
                throw new IllegalStateException("Unknown action type");
        }
    }

    public synchronized void finishInsertIntoExistingPartition(
            ConnectorSession session,
            String databaseName,
            String tableName,
            String tablePath,
            List<String> partitionValues,
            Path currentLocation,
            List<String> fileNames,
            PartitionStatistics statisticsUpdate)
    {
        setShared();
        SchemaTableName schemaTableName = new SchemaTableName(databaseName, tableName);
        Map<List<String>, Action<PartitionAndMore>> partitionActionsOfTable = partitionActions.computeIfAbsent(schemaTableName, k -> new HashMap<>());
        Action<PartitionAndMore> oldPartitionAction = partitionActionsOfTable.get(partitionValues);
        MetastoreContext metastoreContext = new MetastoreContext(
                session.getIdentity(),
                session.getQueryId(),
                session.getClientInfo(),
                session.getClientTags(),
                session.getSource(),
                getMetastoreHeaders(session),
                isUserDefinedTypeEncodingEnabled(session),
                columnConverterProvider,
                session.getWarningCollector(),
                session.getRuntimeStats());

        if (oldPartitionAction == null) {
            Partition partition = delegate.getPartition(metastoreContext, databaseName, tableName, partitionValues)
                    .orElseThrow(() -> new PartitionNotFoundException(schemaTableName, partitionValues));
            String partitionName = getPartitionName(metastoreContext, databaseName, tableName, partitionValues);
            PartitionStatistics currentStatistics = delegate.getPartitionStatistics(metastoreContext, databaseName, tableName, ImmutableSet.of(partitionName)).get(partitionName);
            if (currentStatistics == null) {
                throw new PrestoException(HIVE_METASTORE_ERROR, "currentStatistics is null");
            }
            HdfsContext context = new HdfsContext(session, databaseName, tableName, tablePath, false);
            partitionActionsOfTable.put(
                    partitionValues,
                    new Action<>(
                            ActionType.INSERT_EXISTING,
                            new PartitionAndMore(
                                    partition,
                                    currentLocation,
                                    Optional.of(fileNames),
                                    merge(currentStatistics, statisticsUpdate),
                                    statisticsUpdate),
                            context));
            return;
        }

        switch (oldPartitionAction.getType()) {
            case DROP:
                throw new PartitionNotFoundException(schemaTableName, partitionValues);
            case ADD:
            case ALTER:
            case INSERT_EXISTING:
                throw new UnsupportedOperationException("Inserting into a partition that were added, altered, or inserted into in the same transaction is not supported");
            default:
                throw new IllegalStateException("Unknown action type");
        }
    }

    private String getPartitionName(MetastoreContext metastoreContext, String databaseName, String tableName, List<String> partitionValues)
    {
        Table table = getTable(metastoreContext, databaseName, tableName)
                .orElseThrow(() -> new TableNotFoundException(new SchemaTableName(databaseName, tableName)));
        return getPartitionName(table, partitionValues);
    }

    private String getPartitionName(Table table, List<String> partitionValues)
    {
        List<String> columnNames = table.getPartitionColumns().stream()
                .map(Column::getName)
                .collect(toImmutableList());
        return makePartName(columnNames, partitionValues);
    }

    public synchronized void createRole(MetastoreContext metastoreContext, String role, String grantor)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            delegate.createRole(metastoreContext, role, grantor);
            return EMPTY_HIVE_COMMIT_HANDLE;
        });
    }

    public synchronized void dropRole(MetastoreContext metastoreContext, String role)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            delegate.dropRole(metastoreContext, role);
            return EMPTY_HIVE_COMMIT_HANDLE;
        });
    }

    public synchronized Set<String> listRoles(MetastoreContext metastoreContext)
    {
        checkReadable();
        return delegate.listRoles(metastoreContext);
    }

    public synchronized void grantRoles(MetastoreContext metastoreContext, Set<String> roles, Set<PrestoPrincipal> grantees, boolean withAdminOption, PrestoPrincipal grantor)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            delegate.grantRoles(metastoreContext, roles, grantees, withAdminOption, grantor);
            return EMPTY_HIVE_COMMIT_HANDLE;
        });
    }

    public synchronized void revokeRoles(MetastoreContext metastoreContext, Set<String> roles, Set<PrestoPrincipal> grantees, boolean adminOptionFor, PrestoPrincipal grantor)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            delegate.revokeRoles(metastoreContext, roles, grantees, adminOptionFor, grantor);
            return EMPTY_HIVE_COMMIT_HANDLE;
        });
    }

    public synchronized Set<RoleGrant> listRoleGrants(MetastoreContext metastoreContext, PrestoPrincipal principal)
    {
        checkReadable();
        return delegate.listRoleGrants(metastoreContext, principal);
    }

    public synchronized Set<HivePrivilegeInfo> listTablePrivileges(MetastoreContext metastoreContext, String databaseName, String tableName, PrestoPrincipal principal)
    {
        checkReadable();
        SchemaTableName schemaTableName = new SchemaTableName(databaseName, tableName);
        Action<TableAndMore> tableAction = tableActions.get(schemaTableName);
        if (tableAction == null) {
            return delegate.listTablePrivileges(metastoreContext, databaseName, tableName, principal);
        }
        switch (tableAction.getType()) {
            case ADD:
            case ALTER: {
                if (principal.getType() == PrincipalType.ROLE) {
                    return ImmutableSet.of();
                }
                if (!principal.getName().equals(tableAction.getData().getTable().getOwner())) {
                    return ImmutableSet.of();
                }
                Collection<HivePrivilegeInfo> privileges = tableAction.getData().getPrincipalPrivileges().getUserPrivileges().get(principal.getName());
                return ImmutableSet.<HivePrivilegeInfo>builder()
                        .addAll(privileges)
                        .add(new HivePrivilegeInfo(OWNERSHIP, true, new PrestoPrincipal(USER, principal.getName()), new PrestoPrincipal(USER, principal.getName())))
                        .build();
            }
            case INSERT_EXISTING:
                return delegate.listTablePrivileges(metastoreContext, databaseName, tableName, principal);
            case DROP:
                throw new TableNotFoundException(schemaTableName);
            default:
                throw new IllegalStateException("Unknown action type");
        }
    }

    public synchronized void grantTablePrivileges(MetastoreContext metastoreContext, String databaseName, String tableName, PrestoPrincipal grantee, Set<HivePrivilegeInfo> privileges)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            delegate.grantTablePrivileges(metastoreContext, databaseName, tableName, grantee, privileges);
            return EMPTY_HIVE_COMMIT_HANDLE;
        });
    }

    public synchronized void revokeTablePrivileges(MetastoreContext metastoreContext, String databaseName, String tableName, PrestoPrincipal grantee, Set<HivePrivilegeInfo> privileges)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            delegate.revokeTablePrivileges(metastoreContext, databaseName, tableName, grantee, privileges);
            return EMPTY_HIVE_COMMIT_HANDLE;
        });
    }

    public synchronized void dropConstraint(MetastoreContext metastoreContext, String databaseName, String tableName, String constraintName)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            MetastoreOperationResult operationResult = delegate.dropConstraint(metastoreContext, databaseName, tableName, constraintName);
            return buildCommitHandle(new SchemaTableName(databaseName, tableName), operationResult);
        });
    }

    public synchronized void addConstraint(MetastoreContext metastoreContext, String databaseName, String tableName, TableConstraint<String> tableConstraint)
    {
        setExclusive((delegate, hdfsEnvironment) -> {
            MetastoreOperationResult operationResult = delegate.addConstraint(metastoreContext, databaseName, tableName, tableConstraint);
            return buildCommitHandle(new SchemaTableName(databaseName, tableName), operationResult);
        });
    }

    public synchronized void declareIntentionToWrite(
            HdfsContext context,
            MetastoreContext metastoreContext,
            WriteMode writeMode,
            Path stagingPathRoot,
            Optional<Path> tempPathRoot,
            SchemaTableName schemaTableName,
            boolean temporaryTable)
    {
        setShared();
        if (writeMode == WriteMode.DIRECT_TO_TARGET_EXISTING_DIRECTORY) {
            Map<List<String>, Action<PartitionAndMore>> partitionActionsOfTable = partitionActions.get(schemaTableName);
            if (partitionActionsOfTable != null && !partitionActionsOfTable.isEmpty()) {
                throw new PrestoException(NOT_SUPPORTED, "Can not insert into a table with a partition that has been modified in the same transaction when Presto is configured to skip temporary directories.");
            }
        }
        declaredIntentionsToWrite.add(new DeclaredIntentionToWrite(writeMode, context, metastoreContext, stagingPathRoot, tempPathRoot, context.getSession().get().getQueryId(), schemaTableName, temporaryTable));
    }

    public synchronized ConnectorCommitHandle commit()
    {
        try {
            switch (state) {
                case EMPTY:
                    // This is for read-only transactions. Therefore, the times for write is empty.
                    return new HiveCommitHandle(lastDataCommitTimesForRead, ImmutableMap.of());
                case SHARED_OPERATION_BUFFERED:
                    // This is for transactions with write.
                    return commitShared();
                case EXCLUSIVE_OPERATION_BUFFERED:
                    // The times for both read and write are empty since neither read nor write is involved.
                    requireNonNull(bufferedExclusiveOperation, "bufferedExclusiveOperation is null");
                    return bufferedExclusiveOperation.execute(delegate, hdfsEnvironment);
                case FINISHED:
                    throw new IllegalStateException("Tried to commit buffered metastore operations after transaction has been committed/aborted");
                default:
                    throw new IllegalStateException("Unknown state");
            }
        }
        finally {
            state = State.FINISHED;
        }
    }

    public synchronized void rollback()
    {
        try {
            switch (state) {
                case EMPTY:
                case EXCLUSIVE_OPERATION_BUFFERED:
                    break;
                case SHARED_OPERATION_BUFFERED:
                    rollbackShared();
                    break;
                case FINISHED:
                    throw new IllegalStateException("Tried to rollback buffered metastore operations after transaction has been committed/aborted");
                default:
                    throw new IllegalStateException("Unknown state");
            }
        }
        finally {
            state = State.FINISHED;
        }
    }

    @GuardedBy("this")
    private ConnectorCommitHandle commitShared()
    {
        checkHoldsLock();

        Committer committer = new Committer();
        try {
            for (Map.Entry<SchemaTableName, Action<TableAndMore>> entry : tableActions.entrySet()) {
                SchemaTableName schemaTableName = entry.getKey();
                Action<TableAndMore> action = entry.getValue();
                HdfsContext hdfsContext = action.getContext();
                MetastoreContext metastoreContext = new MetastoreContext(
                        hdfsContext.getIdentity(),
                        hdfsContext.getQueryId().orElse(""),
                        hdfsContext.getClientInfo(),
                        hdfsContext.getClientTags().orElse(Collections.emptySet()),
                        hdfsContext.getSource(),
                        hdfsContext.getSession().flatMap(MetastoreUtil::getMetastoreHeaders),
                        hdfsContext.getSession().map(MetastoreUtil::isUserDefinedTypeEncodingEnabled).orElse(false),
                        columnConverterProvider,
                        hdfsContext.getSession().map(ConnectorSession::getWarningCollector).orElse(NOOP),
                        hdfsContext.getSession().map(ConnectorSession::getRuntimeStats).orElseGet(RuntimeStats::new));
                switch (action.getType()) {
                    case DROP:
                        committer.prepareDropTable(metastoreContext, schemaTableName);
                        break;
                    case ALTER:
                        committer.prepareAlterTable();
                        break;
                    case ADD:
                        committer.prepareAddTable(metastoreContext, hdfsContext, action.getData());
                        break;
                    case INSERT_EXISTING:
                        committer.prepareInsertExistingTable(metastoreContext, hdfsContext, action.getData());
                        break;
                    default:
                        throw new IllegalStateException("Unknown action type");
                }
            }
            for (Map.Entry<SchemaTableName, Map<List<String>, Action<PartitionAndMore>>> tableEntry : partitionActions.entrySet()) {
                SchemaTableName schemaTableName = tableEntry.getKey();
                for (Map.Entry<List<String>, Action<PartitionAndMore>> partitionEntry : tableEntry.getValue().entrySet()) {
                    List<String> partitionValues = partitionEntry.getKey();
                    Action<PartitionAndMore> action = partitionEntry.getValue();
                    HdfsContext hdfsContext = action.getContext();
                    MetastoreContext metastoreContext = new MetastoreContext(
                            hdfsContext.getIdentity(),
                            hdfsContext.getQueryId().orElse(""),
                            hdfsContext.getClientInfo(),
                            hdfsContext.getClientTags().orElse(Collections.emptySet()),
                            hdfsContext.getSource(),
                            hdfsContext.getSession().flatMap(MetastoreUtil::getMetastoreHeaders),
                            hdfsContext.getSession().map(MetastoreUtil::isUserDefinedTypeEncodingEnabled).orElse(false),
                            columnConverterProvider,
                            hdfsContext.getSession().map(ConnectorSession::getWarningCollector).orElse(NOOP),
                            hdfsContext.getSession().map(ConnectorSession::getRuntimeStats).orElseGet(RuntimeStats::new));
                    switch (action.getType()) {
                        case DROP:
                            committer.prepareDropPartition(metastoreContext, schemaTableName, partitionValues);
                            break;
                        case ALTER:
                            committer.prepareAlterPartition(metastoreContext, hdfsContext, action.getData());
                            break;
                        case ADD:
                            committer.prepareAddPartition(metastoreContext, hdfsContext, action.getData());
                            break;
                        case INSERT_EXISTING:
                            committer.prepareInsertExistingPartition(metastoreContext, hdfsContext, action.getData());
                            break;
                        default:
                            throw new IllegalStateException("Unknown action type");
                    }
                }
            }

            // Wait for all renames submitted for "INSERT_EXISTING" action to finish
            ListenableFuture<?> listenableFutureAggregate = whenAllSucceed(committer.getFileRenameFutures()).call(() -> null, directExecutor());
            try {
                getFutureValue(listenableFutureAggregate, PrestoException.class);
            }
            catch (RuntimeException e) {
                listenableFutureAggregate.cancel(true);
                throw e;
            }

            // At this point, all file system operations, whether asynchronously issued or not, have completed successfully.
            // We are moving on to metastore operations now.

            committer.executeAddTableOperations();
            committer.executeAlterPartitionOperations();
            committer.executeAddPartitionOperations();
            committer.executeUpdateStatisticsOperations();
        }
        catch (Throwable t) {
            committer.cancelUnstartedAsyncRenames();

            committer.undoUpdateStatisticsOperations();
            committer.undoAddPartitionOperations();
            committer.undoAddTableOperations();

            committer.waitForAsyncRenamesSuppressThrowables();

            // fileRenameFutures must all come back before any file system cleanups are carried out.
            // Otherwise, files that should be deleted may be created after cleanup is done.
            committer.executeCleanupTasksForAbort(declaredIntentionsToWrite);

            committer.executeRenameTasksForAbort();

            // Partition directory must be put back before relevant metastore operation can be undone
            committer.undoAlterPartitionOperations();

            rollbackShared();

            throw t;
        }

        try {
            // After this line, operations are no longer reversible.
            // The next section will deal with "dropping table/partition". Commit may still fail in
            // this section. Even if commit fails, cleanups, instead of rollbacks, will be executed.

            if (!committer.metastoreDeleteOperations.isEmpty()) {
                committer.executeMetastoreDeleteOperations();
            }

            // If control flow reached this point, this commit is considered successful no matter
            // what happens later. The only kind of operations that haven't been carried out yet
            // are cleanups.

            // The program control flow will go to finally next. And cleanup will run because
            // moveForwardInFinally has been set to false.

            return committer.buildCommitHandle();
        }
        finally {
            // In this method, all operations are best-effort clean up operations.
            // If any operation fails, the error will be logged and ignored.
            // Additionally, other clean up operations should still be attempted.

            // Execute deletion tasks
            committer.executeDeletionTasksForFinish();

            // Clean up temporary tables
            deleteTemporaryTableDirectories(declaredIntentionsToWrite, hdfsEnvironment);

            // Clean up empty staging directories (that may recursively contain empty directories)
            committer.deleteEmptyStagingDirectories(declaredIntentionsToWrite);

            // Clean up root temp directories
            deleteTempPathRootDirectory(declaredIntentionsToWrite, hdfsEnvironment);
        }
    }

    private class Committer
    {
        private final AtomicBoolean fileRenameCancelled = new AtomicBoolean(false);
        private final List<ListenableFuture<?>> fileRenameFutures = new ArrayList<>();

        // File system
        // For file system changes, only operations outside of writing paths (as specified in declared intentions to write)
        // need to MOVE_BACKWARD tasks scheduled. Files in writing paths are handled by rollbackShared().
        private final List<DirectoryDeletionTask> deletionTasksForFinish = new ArrayList<>();
        private final List<DirectoryCleanUpTask> cleanUpTasksForAbort = new ArrayList<>();
        private final List<DirectoryRenameTask> renameTasksForAbort = new ArrayList<>();

        // Metastore
        private final List<CreateTableOperation> addTableOperations = new ArrayList<>();
        private final Map<SchemaTableName, PartitionAdder> partitionAdders = new HashMap<>();
        private final List<AlterPartitionOperation> alterPartitionOperations = new ArrayList<>();
        private final List<UpdateStatisticsOperation> updateStatisticsOperations = new ArrayList<>();
        private final List<IrreversibleMetastoreOperation> metastoreDeleteOperations = new ArrayList<>();

        // Flag for better error message
        private boolean deleteOnly = true;

        private List<ListenableFuture<?>> getFileRenameFutures()
        {
            return ImmutableList.copyOf(fileRenameFutures);
        }

        private void prepareDropTable(MetastoreContext metastoreContext, SchemaTableName schemaTableName)
        {
            metastoreDeleteOperations.add(new IrreversibleMetastoreOperation(
                    format("drop table %s", schemaTableName),
                    () -> delegate.dropTable(metastoreContext, schemaTableName.getSchemaName(), schemaTableName.getTableName(), true)));
        }

        private void prepareAlterTable()
        {
            deleteOnly = false;

            // Currently, ALTER action is never constructed for tables. Dropping a table and then re-creating it
            // in the same transaction is not supported now. The following line should be replaced with actual
            // implementation when create after drop support is introduced for a table.
            throw new UnsupportedOperationException("Dropping and then creating a table with the same name is not supported");
        }

        private void prepareAddTable(MetastoreContext metastoreContext, HdfsContext context, TableAndMore tableAndMore)
        {
            deleteOnly = false;

            Table table = tableAndMore.getTable();

            if (table.getTableType().equals(TEMPORARY_TABLE)) {
                // do not commit a temporary table to the metastore
                return;
            }

            if (table.getTableType().equals(MANAGED_TABLE) || table.getTableType().equals(MATERIALIZED_VIEW)) {
                String targetLocation = table.getStorage().getLocation();
                checkArgument(!targetLocation.isEmpty(), "target location is empty");
                Optional<Path> currentPath = tableAndMore.getCurrentLocation();
                Path targetPath = new Path(targetLocation);
                if (table.getPartitionColumns().isEmpty() && currentPath.isPresent() && !targetPath.equals(currentPath.get())) {
                    // CREATE TABLE AS SELECT unpartitioned table with staging directory
                    renameDirectory(
                            context,
                            hdfsEnvironment,
                            currentPath.get(),
                            targetPath,
                            () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true)));
                }
                else {
                    // CREATE TABLE AS SELECT partitioned table, or
                    // CREATE TABLE AS SELECT unpartitioned table without temporary staging directory
                    // CREATE TABLE partitioned/unpartitioned table (without data)
                    if (pathExists(context, hdfsEnvironment, targetPath)) {
                        if (currentPath.isPresent() && currentPath.get().equals(targetPath)) {
                            // It is okay to skip directory creation when currentPath is equal to targetPath
                            // because the directory may have been created when creating partition directories.
                            // However, it is important to note that the two being equal does not guarantee
                            // a directory had been created.
                        }
                        else {
                            throw new PrestoException(
                                    HIVE_PATH_ALREADY_EXISTS,
                                    format("Unable to create directory %s: target directory already exists", targetPath));
                        }
                    }
                    else {
                        cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true));
                        createDirectory(context, hdfsEnvironment, targetPath);
                    }
                }
            }
            addTableOperations.add(new CreateTableOperation(metastoreContext, table, tableAndMore.getPrincipalPrivileges(), tableAndMore.isIgnoreExisting(), tableAndMore.getConstraints()));
            if (!isPrestoView(table)) {
                updateStatisticsOperations.add(new UpdateStatisticsOperation(metastoreContext,
                        table.getSchemaTableName(),
                        Optional.empty(),
                        tableAndMore.getStatisticsUpdate(),
                        false));
            }
        }

        private void prepareInsertExistingTable(MetastoreContext metastoreContext, HdfsContext context, TableAndMore tableAndMore)
        {
            deleteOnly = false;

            Table table = tableAndMore.getTable();

            if (table.getTableType().equals(TEMPORARY_TABLE)) {
                // do not commit a temporary table to the metastore
                return;
            }

            Path targetPath = new Path(table.getStorage().getLocation());
            Path currentPath = tableAndMore.getCurrentLocation().get();
            cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, false));
            if (!targetPath.equals(currentPath)) {
                asyncRename(hdfsEnvironment, renameExecutor, fileRenameCancelled, fileRenameFutures, context, currentPath, targetPath, tableAndMore.getFileNames().get());
            }
            updateStatisticsOperations.add(new UpdateStatisticsOperation(metastoreContext,
                    table.getSchemaTableName(),
                    Optional.empty(),
                    tableAndMore.getStatisticsUpdate(),
                    true));
        }

        private void prepareDropPartition(MetastoreContext metastoreContext, SchemaTableName schemaTableName, List<String> partitionValues)
        {
            metastoreDeleteOperations.add(new IrreversibleMetastoreOperation(
                    format("drop partition %s.%s %s", schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionValues),
                    () -> delegate.dropPartition(metastoreContext, schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionValues, true)));
        }

        private void prepareAlterPartition(MetastoreContext metastoreContext, HdfsContext context, PartitionAndMore partitionAndMore)
        {
            deleteOnly = false;

            Partition partition = partitionAndMore.getPartition();
            String targetLocation = partition.getStorage().getLocation();
            Optional<Partition> oldPartition = delegate.getPartition(metastoreContext, partition.getDatabaseName(), partition.getTableName(), partition.getValues());
            if (!oldPartition.isPresent()) {
                throw new PrestoException(
                        TRANSACTION_CONFLICT,
                        format("The partition that this transaction modified was deleted in another transaction. %s %s", partition.getTableName(), partition.getValues()));
            }
            partition = Partition.builder(partition)
                    .setCreateTime(oldPartition.get().getCreateTime())
                    .setLastDataCommitTime(oldPartition.get().getLastDataCommitTime())
                    .build();
            String partitionName = getPartitionName(metastoreContext, partition.getDatabaseName(), partition.getTableName(), partition.getValues());
            PartitionStatistics oldPartitionStatistics = getExistingPartitionStatistics(metastoreContext, partition, partitionName);
            String oldPartitionLocation = oldPartition.get().getStorage().getLocation();
            Path oldPartitionPath = new Path(oldPartitionLocation);

            // Location of the old partition and the new partition can be different because we allow arbitrary directories through LocationService.
            // If the location of the old partition is the same as the location of the new partition:
            // * Rename the old data directory to a temporary path with a special suffix
            // * Remember we will need to delete that directory at the end if transaction successfully commits
            // * Remember we will need to undo the rename if transaction aborts
            // Otherwise,
            // * Remember we will need to delete the location of the old partition at the end if transaction successfully commits
            if (targetLocation.equals(oldPartitionLocation)) {
                Path oldPartitionStagingPath = new Path(oldPartitionPath.getParent(), "_temp_" + oldPartitionPath.getName() + "_" + context.getQueryId());
                renameDirectory(
                        context,
                        hdfsEnvironment,
                        oldPartitionPath,
                        oldPartitionStagingPath,
                        () -> renameTasksForAbort.add(new DirectoryRenameTask(context, oldPartitionStagingPath, oldPartitionPath)));
                if (!skipDeletionForAlter) {
                    deletionTasksForFinish.add(new DirectoryDeletionTask(context, oldPartitionStagingPath));
                }
            }
            else {
                if (!skipDeletionForAlter) {
                    deletionTasksForFinish.add(new DirectoryDeletionTask(context, oldPartitionPath));
                }
            }

            Path currentPath = partitionAndMore.getCurrentLocation();
            Path targetPath = new Path(targetLocation);
            if (!targetPath.equals(currentPath)) {
                renameDirectory(
                        context,
                        hdfsEnvironment,
                        currentPath,
                        targetPath,
                        () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true)));
            }
            // Partition alter must happen regardless of whether original and current location is the same
            // because metadata might change: e.g. storage format, column types, etc
            alterPartitionOperations.add(new AlterPartitionOperation(metastoreContext,
                    new PartitionWithStatistics(partition, partitionName, partitionAndMore.getStatisticsUpdate()),
                    new PartitionWithStatistics(oldPartition.get(), partitionName, oldPartitionStatistics)));
        }

        private PartitionStatistics getExistingPartitionStatistics(MetastoreContext metastoreContext, Partition partition, String partitionName)
        {
            try {
                PartitionStatistics statistics = delegate.getPartitionStatistics(metastoreContext, partition.getDatabaseName(), partition.getTableName(), ImmutableSet.of(partitionName))
                        .get(partitionName);
                if (statistics == null) {
                    throw new PrestoException(
                            TRANSACTION_CONFLICT,
                            format("The partition that this transaction modified was deleted in another transaction. %s %s", partition.getTableName(), partition.getValues()));
                }
                return statistics;
            }
            catch (PrestoException e) {
                if (e.getErrorCode().equals(HIVE_CORRUPTED_COLUMN_STATISTICS.toErrorCode())) {
                    log.warn(
                            e,
                            "Corrupted statistics found when altering partition. Table: %s.%s. Partition: %s",
                            partition.getDatabaseName(),
                            partition.getTableName(),
                            partition.getValues());
                    return PartitionStatistics.empty();
                }
                throw e;
            }
        }

        private void prepareAddPartition(MetastoreContext metastoreContext, HdfsContext context, PartitionAndMore partitionAndMore)
        {
            deleteOnly = false;

            Partition partition = partitionAndMore.getPartition();
            String targetLocation = partition.getStorage().getLocation();
            Path currentPath = partitionAndMore.getCurrentLocation();
            Path targetPath = new Path(targetLocation);

            PartitionAdder partitionAdder = partitionAdders.computeIfAbsent(
                    partition.getSchemaTableName(),
                    ignored -> new PartitionAdder(metastoreContext, partition.getDatabaseName(), partition.getTableName(), delegate));

            // we can bypass the file storage path checking logic for sync partition code path
            // because the file paths have been verified during early phase of the sync logic already
            if (!context.getIsPathValidationNeeded().orElse(false)) {
                if (pathExists(context, hdfsEnvironment, currentPath)) {
                    if (!targetPath.equals(currentPath)) {
                        renameDirectory(
                                context,
                                hdfsEnvironment,
                                currentPath,
                                targetPath,
                                () -> cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true)));
                    }
                }
                else {
                    cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, true));
                    createDirectory(context, hdfsEnvironment, targetPath);
                }
            }
            String partitionName = getPartitionName(metastoreContext, partition.getDatabaseName(), partition.getTableName(), partition.getValues());
            partitionAdder.addPartition(new PartitionWithStatistics(partition, partitionName, partitionAndMore.getStatisticsUpdate()));
        }

        private void prepareInsertExistingPartition(MetastoreContext metastoreContext, HdfsContext context, PartitionAndMore partitionAndMore)
        {
            deleteOnly = false;

            Partition partition = partitionAndMore.getPartition();
            Path targetPath = new Path(partition.getStorage().getLocation());
            Path currentPath = partitionAndMore.getCurrentLocation();
            cleanUpTasksForAbort.add(new DirectoryCleanUpTask(context, targetPath, false));
            if (!targetPath.equals(currentPath)) {
                asyncRename(hdfsEnvironment, renameExecutor, fileRenameCancelled, fileRenameFutures, context, currentPath, targetPath, partitionAndMore.getFileNames());
            }
            updateStatisticsOperations.add(new UpdateStatisticsOperation(metastoreContext,
                    partition.getSchemaTableName(),
                    Optional.of(getPartitionName(metastoreContext, partition.getDatabaseName(), partition.getTableName(), partition.getValues())),
                    partitionAndMore.getStatisticsUpdate(),
                    true));
        }

        private void executeCleanupTasksForAbort(Collection<DeclaredIntentionToWrite> declaredIntentionsToWrite)
        {
            Set<String> queryIds = declaredIntentionsToWrite.stream()
                    .map(DeclaredIntentionToWrite::getQueryId)
                    .collect(toImmutableSet());
            for (DirectoryCleanUpTask cleanUpTask : cleanUpTasksForAbort) {
                recursiveDeleteFilesAndLog(cleanUpTask.getContext(), cleanUpTask.getPath(), queryIds, cleanUpTask.isDeleteEmptyDirectory(), "temporary directory commit abort");
            }
        }

        private void executeDeletionTasksForFinish()
        {
            for (DirectoryDeletionTask deletionTask : deletionTasksForFinish) {
                if (!deleteRecursivelyIfExists(deletionTask.getContext(), hdfsEnvironment, deletionTask.getPath())) {
                    logCleanupFailure("Error deleting directory %s", deletionTask.getPath().toString());
                }
            }
        }

        private void executeRenameTasksForAbort()
        {
            for (DirectoryRenameTask directoryRenameTask : renameTasksForAbort) {
                try {
                    // Ignore the task if the source directory doesn't exist.
                    // This is probably because the original rename that we are trying to undo here never succeeded.
                    if (pathExists(directoryRenameTask.getContext(), hdfsEnvironment, directoryRenameTask.getRenameFrom())) {
                        renameDirectory(directoryRenameTask.getContext(), hdfsEnvironment, directoryRenameTask.getRenameFrom(), directoryRenameTask.getRenameTo(), () -> {});
                    }
                }
                catch (Throwable throwable) {
                    logCleanupFailure(throwable, "failed to undo rename of partition directory: %s to %s", directoryRenameTask.getRenameFrom(), directoryRenameTask.getRenameTo());
                }
            }
        }

        private void deleteEmptyStagingDirectories(List<DeclaredIntentionToWrite> declaredIntentionsToWrite)
        {
            for (DeclaredIntentionToWrite declaredIntentionToWrite : declaredIntentionsToWrite) {
                if (declaredIntentionToWrite.getMode() != WriteMode.STAGE_AND_MOVE_TO_TARGET_DIRECTORY) {
                    continue;
                }
                Path path = declaredIntentionToWrite.getStagingPathRoot();
                recursiveDeleteFilesAndLog(declaredIntentionToWrite.getContext(), path, ImmutableSet.of(), true, "staging directory cleanup");
            }
        }

        private void waitForAsyncRenamesSuppressThrowables()
        {
            for (ListenableFuture<?> future : fileRenameFutures) {
                try {
                    future.get();
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                catch (Throwable t) {
                    // ignore
                }
            }
        }

        private void cancelUnstartedAsyncRenames()
        {
            fileRenameCancelled.set(true);
        }

        private void executeAddTableOperations()
        {
            for (CreateTableOperation addTableOperation : addTableOperations) {
                addTableOperation.run(delegate);
            }
        }

        private void executeAlterPartitionOperations()
        {
            for (AlterPartitionOperation alterPartitionOperation : alterPartitionOperations) {
                alterPartitionOperation.run(delegate);
            }
        }

        private void executeAddPartitionOperations()
        {
            for (PartitionAdder partitionAdder : partitionAdders.values()) {
                partitionAdder.execute();
            }
        }

        private void executeUpdateStatisticsOperations()
        {
            for (UpdateStatisticsOperation operation : updateStatisticsOperations) {
                operation.run(delegate);
            }
        }

        private void undoAddPartitionOperations()
        {
            if (!undoMetastoreOperationsEnabled) {
                return;
            }
            for (PartitionAdder partitionAdder : partitionAdders.values()) {
                List<List<String>> partitionsFailedToRollback = partitionAdder.rollback();
                if (!partitionsFailedToRollback.isEmpty()) {
                    logCleanupFailure("Failed to rollback: add_partition for partitions %s.%s %s",
                            partitionAdder.getSchemaName(),
                            partitionAdder.getTableName(),
                            partitionsFailedToRollback.stream());
                }
            }
        }

        private void undoAddTableOperations()
        {
            if (!undoMetastoreOperationsEnabled) {
                return;
            }
            for (CreateTableOperation addTableOperation : addTableOperations) {
                try {
                    addTableOperation.undo(delegate);
                }
                catch (Throwable throwable) {
                    logCleanupFailure(throwable, "failed to rollback: %s", addTableOperation.getDescription());
                }
            }
        }

        private void undoAlterPartitionOperations()
        {
            if (!undoMetastoreOperationsEnabled) {
                return;
            }
            for (AlterPartitionOperation alterPartitionOperation : alterPartitionOperations) {
                try {
                    alterPartitionOperation.undo(delegate);
                }
                catch (Throwable throwable) {
                    logCleanupFailure(throwable, "failed to rollback: %s", alterPartitionOperation.getDescription());
                }
            }
        }

        private void undoUpdateStatisticsOperations()
        {
            if (!undoMetastoreOperationsEnabled) {
                return;
            }
            for (UpdateStatisticsOperation operation : updateStatisticsOperations) {
                try {
                    operation.undo(delegate);
                }
                catch (Throwable throwable) {
                    logCleanupFailure(throwable, "failed to rollback: %s", operation.getDescription());
                }
            }
        }

        private void executeMetastoreDeleteOperations()
        {
            List<String> failedDeletionDescriptions = new ArrayList<>();
            List<Throwable> suppressedExceptions = new ArrayList<>();
            boolean anySucceeded = false;
            ErrorCodeSupplier errorCode = HIVE_METASTORE_ERROR;
            for (IrreversibleMetastoreOperation deleteOperation : metastoreDeleteOperations) {
                try {
                    deleteOperation.run();
                    anySucceeded = true;
                }
                catch (Throwable t) {
                    if (metastoreDeleteOperations.size() == 1 && t instanceof TableNotFoundException) {
                        throw new PrestoException(HIVE_TABLE_DROPPED_DURING_QUERY,
                                "The metastore delete operation failed: " + deleteOperation.getDescription());
                    }
                    if (t instanceof PrestoException) {
                        if (((PrestoException) t).getErrorCode().getType() == USER_ERROR) {
                            errorCode = HIVE_METASTORE_USER_ERROR;
                        }
                    }
                    failedDeletionDescriptions.add(deleteOperation.getDescription());
                    // A limit is needed to avoid having a huge exception object. 5 was chosen arbitrarily.
                    if (suppressedExceptions.size() < 5) {
                        suppressedExceptions.add(t);
                    }
                }
            }
            if (!suppressedExceptions.isEmpty()) {
                StringBuilder message = new StringBuilder();
                if (deleteOnly && !anySucceeded) {
                    message.append("The following metastore delete operations failed: ");
                }
                else {
                    message.append("The transaction didn't commit cleanly. All operations other than the following delete operations were completed: ");
                }
                Joiner.on("; ").appendTo(message, failedDeletionDescriptions);

                PrestoException prestoException = new PrestoException(errorCode, message.toString());
                suppressedExceptions.forEach(prestoException::addSuppressed);
                throw prestoException;
            }
        }

        public ConnectorCommitHandle buildCommitHandle()
        {
            // Only the last metastore operations is returned, when a series of operations are executed.
            Map<SchemaTableName, List<Long>> partitionAlterationResults = buildPartitionAlterationResults();
            if (!partitionAlterationResults.isEmpty()) {
                return new HiveCommitHandle(lastDataCommitTimesForRead, partitionAlterationResults);
            }

            Map<SchemaTableName, List<Long>> partitionCreationResults = buildPartitionCreationResults();
            if (!partitionCreationResults.isEmpty()) {
                return new HiveCommitHandle(lastDataCommitTimesForRead, partitionCreationResults);
            }

            Map<SchemaTableName, List<Long>> tableCreationResults = buildTableCreationResults();
            if (!tableCreationResults.isEmpty()) {
                return new HiveCommitHandle(lastDataCommitTimesForRead, tableCreationResults);
            }

            return EMPTY_HIVE_COMMIT_HANDLE;
        }

        private ImmutableMap<SchemaTableName, List<Long>> buildTableCreationResults()
        {
            ImmutableMap.Builder<SchemaTableName, List<Long>> builder = ImmutableMap.builder();
            for (CreateTableOperation operation : addTableOperations) {
                if (!operation.getOperationResult().isPresent()) {
                    continue;
                }
                builder.put(
                        operation.getTable(),
                        operation.getOperationResult().get().getLastDataCommitTimes());
            }
            return builder.build();
        }

        private ImmutableMap<SchemaTableName, List<Long>> buildPartitionCreationResults()
        {
            ImmutableMap.Builder<SchemaTableName, List<Long>> builder = ImmutableMap.builder();
            for (SchemaTableName schemaTableName : partitionAdders.keySet()) {
                PartitionAdder partitionAdder = partitionAdders.get(schemaTableName);
                if (partitionAdder.getOperationResults().isEmpty()) {
                    continue;
                }

                ImmutableList.Builder<Long> lastCommitTimeBuilder = ImmutableList.builder();
                for (MetastoreOperationResult operationResult : partitionAdder.getOperationResults()) {
                    lastCommitTimeBuilder.addAll(operationResult.getLastDataCommitTimes());
                }
                builder.put(schemaTableName, lastCommitTimeBuilder.build());
            }
            return builder.build();
        }

        private ImmutableMap<SchemaTableName, List<Long>> buildPartitionAlterationResults()
        {
            ImmutableMap.Builder<SchemaTableName, List<Long>> builder = ImmutableMap.builder();
            ImmutableList.Builder<Long> lastCommitTimeBuilder = ImmutableList.builder();
            SchemaTableName table = null;
            for (AlterPartitionOperation operation : alterPartitionOperations) {
                if (!operation.getOperationResult().isPresent()) {
                    continue;
                }
                table = operation.getTable();
                lastCommitTimeBuilder.addAll(operation.getOperationResult().get().getLastDataCommitTimes());
            }

            if (table != null) {
                builder.put(table, lastCommitTimeBuilder.build());
            }
            return builder.build();
        }
    }

    @GuardedBy("this")
    private void rollbackShared()
    {
        checkHoldsLock();

        deleteTemporaryTableDirectories(declaredIntentionsToWrite, hdfsEnvironment);
        deleteTempPathRootDirectory(declaredIntentionsToWrite, hdfsEnvironment);

        for (DeclaredIntentionToWrite declaredIntentionToWrite : declaredIntentionsToWrite) {
            switch (declaredIntentionToWrite.getMode()) {
                case STAGE_AND_MOVE_TO_TARGET_DIRECTORY:
                case DIRECT_TO_TARGET_NEW_DIRECTORY: {
                    if (skipTargetCleanupOnRollback && declaredIntentionToWrite.getMode() == DIRECT_TO_TARGET_NEW_DIRECTORY) {
                        break;
                    }
                    // Note: For STAGE_AND_MOVE_TO_TARGET_DIRECTORY there is no need to cleanup the target directory as it will only be written
                    // to during the commit call and the commit call cleans up after failures.
                    Path rootPath = declaredIntentionToWrite.getStagingPathRoot();

                    // In the case of DIRECT_TO_TARGET_NEW_DIRECTORY, if the directory is not guaranteed to be unique
                    // for the query, it is possible that another query or compute engine may see the directory, wrote
                    // data to it, and exported it through metastore. Therefore it may be argued that cleanup of staging
                    // directories must be carried out conservatively. To be safe, we only delete files that start or
                    // end with the query IDs in this transaction.
                    recursiveDeleteFilesAndLog(
                            declaredIntentionToWrite.getContext(),
                            rootPath,
                            ImmutableSet.of(declaredIntentionToWrite.getQueryId()),
                            true,
                            format("staging/target_new directory rollback for table %s", declaredIntentionToWrite.getSchemaTableName()));
                    break;
                }
                case DIRECT_TO_TARGET_EXISTING_DIRECTORY: {
                    Set<Path> pathsToClean = new HashSet<>();

                    // Check the base directory of the declared intention
                    // * existing partition may also be in this directory
                    // * this is where new partitions are created
                    Path baseDirectory = declaredIntentionToWrite.getStagingPathRoot();
                    pathsToClean.add(baseDirectory);

                    SchemaTableName schemaTableName = declaredIntentionToWrite.getSchemaTableName();
                    MetastoreContext metastoreContext = declaredIntentionToWrite.getMetastoreContext();
                    Optional<Table> table = delegate.getTable(metastoreContext, schemaTableName.getSchemaName(), schemaTableName.getTableName());
                    if (table.isPresent()) {
                        // check every existing partition that is outside for the base directory
                        if (!table.get().getPartitionColumns().isEmpty()) {
                            List<PartitionNameWithVersion> partitionNamesWithVersion = delegate.getPartitionNames(metastoreContext, schemaTableName.getSchemaName(), schemaTableName.getTableName())
                                    .orElse(ImmutableList.of());
                            for (List<PartitionNameWithVersion> partitionNameBatch : Iterables.partition(partitionNamesWithVersion, 10)) {
                                Collection<Optional<Partition>> partitions = delegate.getPartitionsByNames(metastoreContext, schemaTableName.getSchemaName(), schemaTableName.getTableName(), partitionNameBatch).values();
                                partitions.stream()
                                        .filter(Optional::isPresent)
                                        .map(Optional::get)
                                        .map(partition -> partition.getStorage().getLocation())
                                        .map(Path::new)
                                        .filter(path -> !isSameOrParent(baseDirectory, path))
                                        .forEach(pathsToClean::add);
                            }
                        }
                    }
                    else {
                        logCleanupFailure(
                                "Error rolling back write to table %s.%s. Data directory may contain temporary data. Table was dropped in another transaction.",
                                schemaTableName.getSchemaName(),
                                schemaTableName.getTableName());
                    }

                    // delete any file that starts or ends with the query ID
                    for (Path path : pathsToClean) {
                        // TODO: It is a known deficiency that some empty directory does not get cleaned up in S3.
                        // We can not delete any of the directories here since we do not know who created them.
                        recursiveDeleteFilesAndLog(
                                declaredIntentionToWrite.getContext(),
                                path,
                                ImmutableSet.of(declaredIntentionToWrite.getQueryId()),
                                false,
                                format("target_existing directory rollback for table %s", schemaTableName));
                    }

                    break;
                }
                default:
                    throw new UnsupportedOperationException("Unknown write mode");
            }
        }
    }

    private static void deleteTemporaryTableDirectories(List<DeclaredIntentionToWrite> declaredIntentionsToWrite, HdfsEnvironment hdfsEnvironment)
    {
        for (DeclaredIntentionToWrite declaredIntentionToWrite : declaredIntentionsToWrite) {
            if (declaredIntentionToWrite.isTemporaryTable()) {
                deleteRecursivelyIfExists(declaredIntentionToWrite.getContext(), hdfsEnvironment, declaredIntentionToWrite.getStagingPathRoot());
            }
        }
    }

    private static void deleteTempPathRootDirectory(List<DeclaredIntentionToWrite> declaredIntentionsToWrite, HdfsEnvironment hdfsEnvironment)
    {
        for (DeclaredIntentionToWrite declaredIntentionToWrite : declaredIntentionsToWrite) {
            if (declaredIntentionToWrite.getTempPathRoot().isPresent()) {
                deleteRecursivelyIfExists(declaredIntentionToWrite.getContext(), hdfsEnvironment, declaredIntentionToWrite.getTempPathRoot().get());
            }
        }
    }

    @VisibleForTesting
    public synchronized void testOnlyCheckIsReadOnly()
    {
        if (state != State.EMPTY) {
            throw new AssertionError("Test did not commit or rollback");
        }
    }

    @VisibleForTesting
    public void testOnlyThrowOnCleanupFailures()
    {
        throwOnCleanupFailure = true;
    }

    @GuardedBy("this")
    private void checkReadable()
    {
        checkHoldsLock();

        switch (state) {
            case EMPTY:
            case SHARED_OPERATION_BUFFERED:
                return;
            case EXCLUSIVE_OPERATION_BUFFERED:
                throw new PrestoException(NOT_SUPPORTED, "Unsupported combination of operations in a single transaction");
            case FINISHED:
                throw new IllegalStateException("Tried to access metastore after transaction has been committed/aborted");
        }
    }

    @GuardedBy("this")
    private void setShared()
    {
        checkHoldsLock();

        checkReadable();
        state = State.SHARED_OPERATION_BUFFERED;
    }

    @GuardedBy("this")
    private void setExclusive(ExclusiveOperation exclusiveOperation)
    {
        checkHoldsLock();

        if (state != State.EMPTY) {
            throw new PrestoException(StandardErrorCode.NOT_SUPPORTED, "Unsupported combination of operations in a single transaction");
        }
        state = State.EXCLUSIVE_OPERATION_BUFFERED;
        bufferedExclusiveOperation = exclusiveOperation;
    }

    @GuardedBy("this")
    private void checkNoPartitionAction(String databaseName, String tableName)
    {
        checkHoldsLock();

        Map<List<String>, Action<PartitionAndMore>> partitionActionsOfTable = partitionActions.get(new SchemaTableName(databaseName, tableName));
        if (partitionActionsOfTable != null && !partitionActionsOfTable.isEmpty()) {
            throw new PrestoException(NOT_SUPPORTED, "Cannot make schema changes to a table/view with modified partitions in the same transaction");
        }
    }

    private static boolean isSameOrParent(Path parent, Path child)
    {
        int parentDepth = parent.depth();
        int childDepth = child.depth();
        if (parentDepth > childDepth) {
            return false;
        }
        for (int i = childDepth; i > parentDepth; i--) {
            child = child.getParent();
        }
        return parent.equals(child);
    }

    private void logCleanupFailure(String format, Object... args)
    {
        if (throwOnCleanupFailure) {
            throw new RuntimeException(format(format, args));
        }
        log.warn(format, args);
    }

    private void logCleanupFailure(Throwable t, String format, Object... args)
    {
        if (throwOnCleanupFailure) {
            throw new RuntimeException(format(format, args), t);
        }
        log.warn(t, format, args);
    }

    private static void asyncRename(
            HdfsEnvironment hdfsEnvironment,
            ListeningExecutorService executor,
            AtomicBoolean cancelled,
            List<ListenableFuture<?>> fileRenameFutures,
            HdfsContext context,
            Path currentPath,
            Path targetPath,
            List<String> fileNames)
    {
        FileSystem fileSystem = getFileSystem(hdfsEnvironment, context, currentPath);
        for (String fileName : fileNames) {
            Path source = new Path(currentPath, fileName);
            Path target = new Path(targetPath, fileName);
            fileRenameFutures.add(executor.submit(() -> {
                if (cancelled.get()) {
                    return;
                }
                renameFile(fileSystem, source, target);
            }));
        }
    }

    private void recursiveDeleteFilesAndLog(HdfsContext context, Path directory, Set<String> queryIds, boolean deleteEmptyDirectories, String reason)
    {
        RecursiveDeleteResult recursiveDeleteResult = recursiveDeleteFiles(
                hdfsEnvironment,
                context,
                directory,
                queryIds,
                deleteEmptyDirectories);
        if (!recursiveDeleteResult.getNotDeletedEligibleItems().isEmpty()) {
            logCleanupFailure(
                    "Error deleting directory %s for %s. Some eligible items can not be deleted: %s.",
                    directory.toString(),
                    reason,
                    recursiveDeleteResult.getNotDeletedEligibleItems());
        }
        else if (deleteEmptyDirectories && !recursiveDeleteResult.isDirectoryNoLongerExists()) {
            logCleanupFailure(
                    "Error deleting directory %s for %s. Can not delete the directory.",
                    directory.toString(),
                    reason);
        }
    }

    /**
     * Attempt to recursively remove eligible files and/or directories in {@code directory}.
     * <p>
     * When {@code queryIds} is not present, all files (but not necessarily directories) will be
     * ineligible. If all files shall be deleted, you can use an empty string as {@code queryIds}.
     * <p>
     * When {@code deleteEmptySubDirectory} is true, any empty directory (including directories that
     * were originally empty, and directories that become empty after files prefixed or suffixed with
     * {@code queryIds} are deleted) will be eligible.
     * <p>
     * This method will not delete anything that's neither a directory nor a file.
     *
     * @param queryIds               prefix or suffix of files that should be deleted
     * @param deleteEmptyDirectories whether empty directories should be deleted
     */
    private static RecursiveDeleteResult recursiveDeleteFiles(HdfsEnvironment hdfsEnvironment, HdfsContext context, Path directory, Set<String> queryIds, boolean deleteEmptyDirectories)
    {
        FileSystem fileSystem;
        try {
            fileSystem = hdfsEnvironment.getFileSystem(context, directory);

            if (!fileSystem.exists(directory)) {
                return new RecursiveDeleteResult(true, ImmutableList.of());
            }
        }
        catch (IOException e) {
            ImmutableList.Builder<String> notDeletedItems = ImmutableList.builder();
            notDeletedItems.add(directory.toString() + "/**");
            return new RecursiveDeleteResult(false, notDeletedItems.build());
        }

        return doRecursiveDeleteFiles(fileSystem, directory, queryIds, deleteEmptyDirectories);
    }

    private static RecursiveDeleteResult doRecursiveDeleteFiles(FileSystem fileSystem, Path directory, Set<String> queryIds, boolean deleteEmptyDirectories)
    {
        // don't delete hidden presto directories
        if (directory.getName().startsWith(".presto")) {
            return new RecursiveDeleteResult(false, ImmutableList.of());
        }

        FileStatus[] allFiles;
        try {
            allFiles = fileSystem.listStatus(directory);
        }
        catch (IOException e) {
            ImmutableList.Builder<String> notDeletedItems = ImmutableList.builder();
            notDeletedItems.add(directory.toString() + "/**");
            return new RecursiveDeleteResult(false, notDeletedItems.build());
        }

        boolean allDescendentsDeleted = true;
        ImmutableList.Builder<String> notDeletedEligibleItems = ImmutableList.builder();
        for (FileStatus fileStatus : allFiles) {
            if (fileStatus.isFile()) {
                Path filePath = fileStatus.getPath();
                String fileName = filePath.getName();
                boolean eligible = false;
                // never delete presto dot files
                if (!fileName.startsWith(".presto")) {
                    // file name that starts with ".tmp.presto" is staging file, see HiveWriterFactory#createWriter.
                    eligible = queryIds.stream().anyMatch(id ->
                            fileName.startsWith(id) || fileName.startsWith(".tmp.presto." + id) || fileName.endsWith(id));
                }
                if (eligible) {
                    if (!deleteIfExists(fileSystem, filePath, false)) {
                        allDescendentsDeleted = false;
                        notDeletedEligibleItems.add(filePath.toString());
                    }
                }
                else {
                    allDescendentsDeleted = false;
                }
            }
            else if (fileStatus.isDirectory()) {
                RecursiveDeleteResult subResult = doRecursiveDeleteFiles(fileSystem, fileStatus.getPath(), queryIds, deleteEmptyDirectories);
                if (!subResult.isDirectoryNoLongerExists()) {
                    allDescendentsDeleted = false;
                }
                if (!subResult.getNotDeletedEligibleItems().isEmpty()) {
                    notDeletedEligibleItems.addAll(subResult.getNotDeletedEligibleItems());
                }
            }
            else {
                allDescendentsDeleted = false;
                notDeletedEligibleItems.add(fileStatus.getPath().toString());
            }
        }
        if (allDescendentsDeleted && deleteEmptyDirectories) {
            verify(notDeletedEligibleItems.build().isEmpty());
            if (!deleteIfExists(fileSystem, directory, false)) {
                return new RecursiveDeleteResult(false, ImmutableList.of(directory.toString() + "/"));
            }
            return new RecursiveDeleteResult(true, ImmutableList.of());
        }
        return new RecursiveDeleteResult(false, notDeletedEligibleItems.build());
    }

    /**
     * Attempts to remove the file or empty directory.
     *
     * @return true if the location no longer exists
     */
    private static boolean deleteIfExists(FileSystem fileSystem, Path path, boolean recursive)
    {
        try {
            // attempt to delete the path
            if (fileSystem.delete(path, recursive)) {
                return true;
            }

            // delete failed
            // check if path still exists
            return !fileSystem.exists(path);
        }
        catch (FileNotFoundException ignored) {
            // path was already removed or never existed
            return true;
        }
        catch (IOException ignored) {
        }
        return false;
    }

    /**
     * Attempts to remove the file or empty directory.
     *
     * @return true if the location no longer exists
     */
    private static boolean deleteRecursivelyIfExists(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path path)
    {
        FileSystem fileSystem;
        try {
            fileSystem = hdfsEnvironment.getFileSystem(context, path);
        }
        catch (IOException ignored) {
            return false;
        }

        return deleteIfExists(fileSystem, path, true);
    }

    private static void renameDirectory(HdfsContext context, HdfsEnvironment hdfsEnvironment, Path source, Path target, Runnable runWhenPathDoesntExist)
    {
        if (pathExists(context, hdfsEnvironment, target)) {
            throw new PrestoException(HIVE_PATH_ALREADY_EXISTS,
                    format("Unable to rename from %s to %s: target directory already exists", source, target));
        }

        if (!pathExists(context, hdfsEnvironment, target.getParent())) {
            createDirectory(context, hdfsEnvironment, target.getParent());
        }

        // The runnable will assume that if rename fails, it will be okay to delete the directory (if the directory is empty).
        // This is not technically true because a race condition still exists.
        runWhenPathDoesntExist.run();

        try {
            if (!hdfsEnvironment.getFileSystem(context, source).rename(source, target)) {
                throw new PrestoException(HIVE_FILESYSTEM_ERROR, format("Failed to rename %s to %s: rename returned false", source, target));
            }
        }
        catch (IOException e) {
            throw new PrestoException(HIVE_FILESYSTEM_ERROR, format("Failed to rename %s to %s", source, target), e);
        }
    }

    private static Optional<String> getPrestoQueryId(Table table)
    {
        return Optional.ofNullable(table.getParameters().get(PRESTO_QUERY_ID_NAME));
    }

    private static Optional<String> getPrestoQueryId(Partition partition)
    {
        return Optional.ofNullable(partition.getParameters().get(PRESTO_QUERY_ID_NAME));
    }

    private void checkHoldsLock()
    {
        // This method serves a similar purpose at runtime as GuardedBy on method serves during static analysis.
        // This method should not have significant performance impact. If it does, it may be reasonably to remove this method.
        // This intentionally does not use checkState.
        if (!Thread.holdsLock(this)) {
            throw new IllegalStateException(format("Thread must hold a lock on the %s", getClass().getSimpleName()));
        }
    }

    private enum State
    {
        EMPTY,
        SHARED_OPERATION_BUFFERED,
        EXCLUSIVE_OPERATION_BUFFERED,
        FINISHED,
    }

    private enum ActionType
    {
        DROP,
        ADD,
        ALTER,
        INSERT_EXISTING
    }

    private enum TableSource
    {
        CREATED_IN_THIS_TRANSACTION,
        PRE_EXISTING_TABLE,
        // RECREATED_IN_THIS_TRANSACTION is a possible case, but it is not supported with the current implementation
    }

    public static class Action<T>
    {
        private final ActionType type;
        private final T data;
        private final HdfsContext context;

        public Action(ActionType type, T data, HdfsContext context)
        {
            this.type = requireNonNull(type, "type is null");
            if (type == ActionType.DROP) {
                checkArgument(data == null, "data is not null");
            }
            else {
                requireNonNull(data, "data is null");
            }
            this.data = data;
            this.context = requireNonNull(context, "context is null");
        }

        public ActionType getType()
        {
            return type;
        }

        public T getData()
        {
            checkState(type != ActionType.DROP);
            return data;
        }

        public HdfsContext getContext()
        {
            return context;
        }

        @Override
        public String toString()
        {
            return toStringHelper(this)
                    .add("type", type)
                    .add("data", data)
                    .toString();
        }
    }

    private static class TableAndMore
    {
        private final Table table;
        private final Optional<PrincipalPrivileges> principalPrivileges;
        private final Optional<Path> currentLocation; // unpartitioned table only
        private final Optional<List<String>> fileNames;
        private final boolean ignoreExisting;
        private final PartitionStatistics statistics;
        private final PartitionStatistics statisticsUpdate;
        private final List<TableConstraint<String>> constraints;

        public TableAndMore(
                Table table,
                Optional<PrincipalPrivileges> principalPrivileges,
                Optional<Path> currentLocation,
                Optional<List<String>> fileNames,
                boolean ignoreExisting,
                PartitionStatistics statistics,
                PartitionStatistics statisticsUpdate,
                List<TableConstraint<String>> constraints)
        {
            this.table = requireNonNull(table, "table is null");
            this.principalPrivileges = requireNonNull(principalPrivileges, "principalPrivileges is null");
            this.currentLocation = requireNonNull(currentLocation, "currentLocation is null");
            this.fileNames = requireNonNull(fileNames, "fileNames is null");
            this.ignoreExisting = ignoreExisting;
            this.statistics = requireNonNull(statistics, "statistics is null");
            this.statisticsUpdate = requireNonNull(statisticsUpdate, "statisticsUpdate is null");
            this.constraints = requireNonNull(constraints, "constraints is null");

            checkArgument(!table.getTableType().equals(VIRTUAL_VIEW) || !currentLocation.isPresent(), "currentLocation can not be supplied for view");
            checkArgument(!fileNames.isPresent() || currentLocation.isPresent(), "fileNames can be supplied only when currentLocation is supplied");
        }

        public boolean isIgnoreExisting()
        {
            return ignoreExisting;
        }

        public Table getTable()
        {
            return table;
        }

        public PrincipalPrivileges getPrincipalPrivileges()
        {
            checkState(principalPrivileges.isPresent());
            return principalPrivileges.get();
        }

        public Optional<Path> getCurrentLocation()
        {
            return currentLocation;
        }

        public Optional<List<String>> getFileNames()
        {
            return fileNames;
        }

        public PartitionStatistics getStatistics()
        {
            return statistics;
        }

        public PartitionStatistics getStatisticsUpdate()
        {
            return statisticsUpdate;
        }

        public List<TableConstraint<String>> getConstraints()
        {
            return constraints;
        }

        public Table getAugmentedTableForInTransactionRead()
        {
            // Don't augment the location for partitioned tables,
            // as data is never read directly from the partitioned table location
            if (!table.getPartitionColumns().isEmpty()) {
                return table;
            }

            // view don't have currentLocation
            if (!currentLocation.isPresent()) {
                return table;
            }

            // For unpartitioned table, this method augments the location field of the table
            // to the staging location.
            // This way, if the table is accessed in an ongoing transaction, staged data
            // can be found and accessed.
            String currentLocation = this.currentLocation.get().toString();
            if (!currentLocation.equals(table.getStorage().getLocation())) {
                return Table.builder(table)
                        .withStorage(storage -> storage.setLocation(currentLocation))
                        .build();
            }
            return table;
        }

        @Override
        public String toString()
        {
            return toStringHelper(this)
                    .add("table", table)
                    .add("principalPrivileges", principalPrivileges)
                    .add("currentLocation", currentLocation)
                    .add("fileNames", fileNames)
                    .add("ignoreExisting", ignoreExisting)
                    .add("statistics", statistics)
                    .add("statisticsUpdate", statisticsUpdate)
                    .add("constraints", constraints)
                    .toString();
        }
    }

    private static class PartitionAndMore
    {
        private final Partition partition;
        private final Path currentLocation;
        private final Optional<List<String>> fileNames;
        private final PartitionStatistics statistics;
        private final PartitionStatistics statisticsUpdate;

        public PartitionAndMore(Partition partition, Path currentLocation, Optional<List<String>> fileNames, PartitionStatistics statistics, PartitionStatistics statisticsUpdate)
        {
            this.partition = requireNonNull(partition, "partition is null");
            this.currentLocation = requireNonNull(currentLocation, "currentLocation is null");
            this.fileNames = requireNonNull(fileNames, "fileNames is null");
            this.statistics = requireNonNull(statistics, "statistics is null");
            this.statisticsUpdate = requireNonNull(statisticsUpdate, "statisticsUpdate is null");
        }

        public Partition getPartition()
        {
            return partition;
        }

        public Path getCurrentLocation()
        {
            return currentLocation;
        }

        public List<String> getFileNames()
        {
            checkState(fileNames.isPresent());
            return fileNames.get();
        }

        public PartitionStatistics getStatistics()
        {
            return statistics;
        }

        public PartitionStatistics getStatisticsUpdate()
        {
            return statisticsUpdate;
        }

        public Partition getAugmentedPartitionForInTransactionRead()
        {
            // This method augments the location field of the partition to the staging location.
            // This way, if the partition is accessed in an ongoing transaction, staged data
            // can be found and accessed.
            String currentLocation = this.currentLocation.toString();
            if (!currentLocation.equals(partition.getStorage().getLocation())) {
                return Partition.builder(partition)
                        .withStorage(storage -> storage.setLocation(currentLocation))
                        .build();
            }
            return partition;
        }

        @Override
        public String toString()
        {
            return toStringHelper(this)
                    .add("partition", partition)
                    .add("currentLocation", currentLocation)
                    .add("fileNames", fileNames)
                    .toString();
        }
    }

    private static class DeclaredIntentionToWrite
    {
        private final WriteMode mode;
        private final HdfsContext context;
        private final String queryId;
        private final Path stagingPathRoot;
        private final Optional<Path> tempPathRoot;
        private final SchemaTableName schemaTableName;
        private final boolean temporaryTable;
        private final MetastoreContext metastoreContext;

        public DeclaredIntentionToWrite(
                WriteMode mode,
                HdfsContext context,
                MetastoreContext metastoreContext,
                Path stagingPathRoot,
                Optional<Path> tempPathRoot,
                String queryId,
                SchemaTableName schemaTableName,
                boolean temporaryTable)
        {
            this.mode = requireNonNull(mode, "mode is null");
            this.context = requireNonNull(context, "context is null");
            this.metastoreContext = requireNonNull(metastoreContext, "metastoreContext is null");
            this.stagingPathRoot = requireNonNull(stagingPathRoot, "stagingPathRoot is null");
            this.tempPathRoot = requireNonNull(tempPathRoot, "tempPathRoot is null");
            this.queryId = requireNonNull(queryId, "queryId is null");
            this.schemaTableName = requireNonNull(schemaTableName, "schemaTableName is null");
            this.temporaryTable = temporaryTable;
        }

        public WriteMode getMode()
        {
            return mode;
        }

        public HdfsContext getContext()
        {
            return context;
        }

        public String getQueryId()
        {
            return queryId;
        }

        public Path getStagingPathRoot()
        {
            return stagingPathRoot;
        }

        public Optional<Path> getTempPathRoot()
        {
            return tempPathRoot;
        }

        public SchemaTableName getSchemaTableName()
        {
            return schemaTableName;
        }

        public boolean isTemporaryTable()
        {
            return temporaryTable;
        }

        public MetastoreContext getMetastoreContext()
        {
            return metastoreContext;
        }

        @Override
        public String toString()
        {
            return toStringHelper(this)
                    .add("mode", mode)
                    .add("context", context)
                    .add("metastoreContext", metastoreContext)
                    .add("queryId", queryId)
                    .add("stagingPathRoot", stagingPathRoot)
                    .add("tempPathRoot", tempPathRoot)
                    .add("schemaTableName", schemaTableName)
                    .add("temporaryTable", temporaryTable)
                    .toString();
        }
    }

    private static class DirectoryCleanUpTask
    {
        private final HdfsContext context;
        private final Path path;
        private final boolean deleteEmptyDirectory;

        public DirectoryCleanUpTask(HdfsContext context, Path path, boolean deleteEmptyDirectory)
        {
            this.context = context;
            this.path = path;
            this.deleteEmptyDirectory = deleteEmptyDirectory;
        }

        public HdfsContext getContext()
        {
            return context;
        }

        public Path getPath()
        {
            return path;
        }

        public boolean isDeleteEmptyDirectory()
        {
            return deleteEmptyDirectory;
        }

        @Override
        public String toString()
        {
            return toStringHelper(this)
                    .add("context", context)
                    .add("path", path)
                    .add("deleteEmptyDirectory", deleteEmptyDirectory)
                    .toString();
        }
    }

    private static class DirectoryDeletionTask
    {
        private final HdfsContext context;
        private final Path path;

        public DirectoryDeletionTask(HdfsContext context, Path path)
        {
            this.context = context;
            this.path = path;
        }

        public HdfsContext getContext()
        {
            return context;
        }

        public Path getPath()
        {
            return path;
        }

        @Override
        public String toString()
        {
            return toStringHelper(this)
                    .add("context", context)
                    .add("path", path)
                    .toString();
        }
    }

    private static class DirectoryRenameTask
    {
        private final HdfsContext context;
        private final Path renameFrom;
        private final Path renameTo;

        public DirectoryRenameTask(HdfsContext context, Path renameFrom, Path renameTo)
        {
            this.context = requireNonNull(context, "context is null");
            this.renameFrom = requireNonNull(renameFrom, "renameFrom is null");
            this.renameTo = requireNonNull(renameTo, "renameTo is null");
        }

        public HdfsContext getContext()
        {
            return context;
        }

        public Path getRenameFrom()
        {
            return renameFrom;
        }

        public Path getRenameTo()
        {
            return renameTo;
        }

        @Override
        public String toString()
        {
            return toStringHelper(this)
                    .add("context", context)
                    .add("renameFrom", renameFrom)
                    .add("renameTo", renameTo)
                    .toString();
        }
    }

    private static class IrreversibleMetastoreOperation
    {
        private final String description;
        private final Runnable action;

        public IrreversibleMetastoreOperation(String description, Runnable action)
        {
            this.description = requireNonNull(description, "description is null");
            this.action = requireNonNull(action, "action is null");
        }

        public String getDescription()
        {
            return description;
        }

        public void run()
        {
            action.run();
        }
    }

    private static class CreateTableOperation
    {
        private final Table newTable;
        private final PrincipalPrivileges privileges;
        private boolean tableCreated;
        private final boolean ignoreExisting;
        private final List<TableConstraint<String>> constraints;
        private final String queryId;
        private final MetastoreContext metastoreContext;
        private Optional<MetastoreOperationResult> operationResult;

        public CreateTableOperation(MetastoreContext metastoreContext, Table newTable, PrincipalPrivileges privileges, boolean ignoreExisting, List<TableConstraint<String>> constraints)
        {
            requireNonNull(newTable, "newTable is null");
            this.metastoreContext = requireNonNull(metastoreContext, "identity is null");
            this.newTable = newTable;
            this.privileges = requireNonNull(privileges, "privileges is null");
            this.ignoreExisting = ignoreExisting;
            this.constraints = requireNonNull(constraints, "constraints is null");
            this.queryId = getPrestoQueryId(newTable).orElseThrow(() -> new IllegalArgumentException("Query id is not present"));
            this.operationResult = Optional.empty();
        }

        public String getDescription()
        {
            return format("add table %s.%s", newTable.getDatabaseName(), newTable.getTableName());
        }

        public Optional<MetastoreOperationResult> getOperationResult()
        {
            return operationResult;
        }

        public SchemaTableName getTable()
        {
            return new SchemaTableName(newTable.getDatabaseName(), newTable.getTableName());
        }

        public void run(ExtendedHiveMetastore metastore)
        {
            boolean done = false;
            try {
                operationResult = Optional.of(metastore.createTable(metastoreContext, newTable, privileges, constraints));
                done = true;
            }
            catch (RuntimeException e) {
                try {
                    Optional<Table> existingTable = metastore.getTable(metastoreContext, newTable.getDatabaseName(), newTable.getTableName());
                    if (existingTable.isPresent()) {
                        Table table = existingTable.get();
                        Optional<String> existingTableQueryId = getPrestoQueryId(table);
                        if (existingTableQueryId.isPresent() && existingTableQueryId.get().equals(queryId)) {
                            // ignore table if it was already created by the same query during retries
                            done = true;
                        }
                        else {
                            // If the table definition in the metastore is different than what this tx wants to create
                            // then there is a conflict (e.g., current tx wants to create T(a: bigint),
                            // but another tx already created T(a: varchar)).
                            // This may be a problem if there is an insert after this step.
                            if (!hasTheSameSchema(newTable, table)) {
                                e = new PrestoException(TRANSACTION_CONFLICT, format("Table already exists with a different schema: '%s'", newTable.getTableName()));
                            }
                            else {
                                done = ignoreExisting;
                            }
                        }
                    }
                }
                catch (RuntimeException ignored) {
                    // When table could not be fetched from metastore, it is not known whether the table was added.
                    // Deleting the table when aborting commit has the risk of deleting table not added in this transaction.
                    // Not deleting the table may leave garbage behind. The former is much more dangerous than the latter.
                    // Therefore, the table is not considered added.
                }

                if (!done) {
                    throw e;
                }
            }
            tableCreated = true;
        }

        private boolean hasTheSameSchema(Table newTable, Table existingTable)
        {
            List<Column> newTableColumns = newTable.getDataColumns();
            List<Column> existingTableColumns = existingTable.getDataColumns();

            if (newTableColumns.size() != existingTableColumns.size()) {
                return false;
            }

            for (Column existingColumn : existingTableColumns) {
                if (newTableColumns.stream()
                        .noneMatch(newColumn -> newColumn.getName().equals(existingColumn.getName())
                                && newColumn.getType().equals(existingColumn.getType()))) {
                    return false;
                }
            }
            return true;
        }

        public void undo(ExtendedHiveMetastore metastore)
        {
            if (!tableCreated) {
                return;
            }
            metastore.dropTable(metastoreContext, newTable.getDatabaseName(), newTable.getTableName(), false);
            operationResult = Optional.empty();
        }
    }

    private static class AlterPartitionOperation
    {
        private final PartitionWithStatistics newPartition;
        private final PartitionWithStatistics oldPartition;
        private boolean undo;
        private final MetastoreContext metastoreContext;
        private Optional<MetastoreOperationResult> operationResult;

        public AlterPartitionOperation(MetastoreContext metastoreContext, PartitionWithStatistics newPartition, PartitionWithStatistics oldPartition)
        {
            this.newPartition = requireNonNull(newPartition, "newPartition is null");
            this.oldPartition = requireNonNull(oldPartition, "oldPartition is null");
            this.metastoreContext = requireNonNull(metastoreContext, "metastoreContext is null");
            this.operationResult = Optional.empty();
            checkArgument(newPartition.getPartition().getDatabaseName().equals(oldPartition.getPartition().getDatabaseName()));
            checkArgument(newPartition.getPartition().getTableName().equals(oldPartition.getPartition().getTableName()));
            checkArgument(newPartition.getPartition().getValues().equals(oldPartition.getPartition().getValues()));
        }

        public Optional<MetastoreOperationResult> getOperationResult()
        {
            return operationResult;
        }

        public SchemaTableName getTable()
        {
            Partition partition = newPartition.getPartition();
            return new SchemaTableName(partition.getDatabaseName(), partition.getTableName());
        }

        public String getDescription()
        {
            return format(
                    "alter partition %s.%s %s",
                    newPartition.getPartition().getDatabaseName(),
                    newPartition.getPartition().getTableName(),
                    newPartition.getPartition().getValues());
        }

        public void run(ExtendedHiveMetastore metastore)
        {
            undo = true;
            operationResult = Optional.of(metastore.alterPartition(
                    metastoreContext,
                    newPartition.getPartition().getDatabaseName(),
                    newPartition.getPartition().getTableName(),
                    newPartition));
        }

        public void undo(ExtendedHiveMetastore metastore)
        {
            if (!undo) {
                return;
            }
            metastore.alterPartition(metastoreContext, oldPartition.getPartition().getDatabaseName(), oldPartition.getPartition().getTableName(), oldPartition);
            operationResult = Optional.empty();
        }
    }

    private static class UpdateStatisticsOperation
    {
        private final SchemaTableName tableName;
        private final Optional<String> partitionName;
        private final PartitionStatistics statistics;
        private final MetastoreContext metastoreContext;
        private final boolean merge;

        private boolean done;

        public UpdateStatisticsOperation(MetastoreContext metastoreContext, SchemaTableName tableName, Optional<String> partitionName, PartitionStatistics statistics, boolean merge)
        {
            this.tableName = requireNonNull(tableName, "tableName is null");
            this.metastoreContext = requireNonNull(metastoreContext, "metastoreContext is null");
            this.partitionName = requireNonNull(partitionName, "partitionValues is null");
            this.statistics = requireNonNull(statistics, "statistics is null");
            this.merge = merge;
        }

        public void run(ExtendedHiveMetastore metastore)
        {
            if (partitionName.isPresent()) {
                metastore.updatePartitionStatistics(metastoreContext, tableName.getSchemaName(), tableName.getTableName(), partitionName.get(), this::updateStatistics);
            }
            else {
                metastore.updateTableStatistics(metastoreContext, tableName.getSchemaName(), tableName.getTableName(), this::updateStatistics);
            }
            done = true;
        }

        public void undo(ExtendedHiveMetastore metastore)
        {
            if (!done) {
                return;
            }
            if (partitionName.isPresent()) {
                metastore.updatePartitionStatistics(metastoreContext, tableName.getSchemaName(), tableName.getTableName(), partitionName.get(), this::resetStatistics);
            }
            else {
                metastore.updateTableStatistics(metastoreContext, tableName.getSchemaName(), tableName.getTableName(), this::resetStatistics);
            }
        }

        public String getDescription()
        {
            if (partitionName.isPresent()) {
                return format("replace partition parameters %s %s", tableName, partitionName.get());
            }
            return format("replace table parameters %s", tableName);
        }

        private PartitionStatistics updateStatistics(PartitionStatistics currentStatistics)
        {
            return merge ? merge(currentStatistics, statistics) : statistics;
        }

        private PartitionStatistics resetStatistics(PartitionStatistics currentStatistics)
        {
            return new PartitionStatistics(reduce(currentStatistics.getBasicStatistics(), statistics.getBasicStatistics(), SUBTRACT), ImmutableMap.of());
        }
    }

    private static class PartitionAdder
    {
        private final String schemaName;
        private final String tableName;
        private final ExtendedHiveMetastore metastore;
        private final int batchSize;
        private final List<PartitionWithStatistics> partitions;
        private final MetastoreContext metastoreContext;
        private List<List<String>> createdPartitionValues = new ArrayList<>();
        private List<MetastoreOperationResult> operationResults;

        public PartitionAdder(MetastoreContext metastoreContext, String schemaName, String tableName, ExtendedHiveMetastore metastore)
        {
            this.metastoreContext = requireNonNull(metastoreContext, "metastoreContext is null");
            this.schemaName = schemaName;
            this.tableName = tableName;
            this.metastore = metastore;
            this.batchSize = metastore.getPartitionCommitBatchSize();
            this.partitions = new ArrayList<>(batchSize);
            this.operationResults = new ArrayList<>();
        }

        public List<MetastoreOperationResult> getOperationResults()
        {
            return operationResults;
        }

        public String getSchemaName()
        {
            return schemaName;
        }

        public String getTableName()
        {
            return tableName;
        }

        public void addPartition(PartitionWithStatistics partition)
        {
            checkArgument(getPrestoQueryId(partition.getPartition()).isPresent());
            partitions.add(partition);
        }

        public void execute()
        {
            List<List<PartitionWithStatistics>> batchedPartitions = Lists.partition(partitions, batchSize);
            for (List<PartitionWithStatistics> batch : batchedPartitions) {
                try {
                    operationResults.add(metastore.addPartitions(metastoreContext, schemaName, tableName, batch));
                    for (PartitionWithStatistics partition : batch) {
                        createdPartitionValues.add(partition.getPartition().getValues());
                    }
                }
                catch (Throwable t) {
                    // Add partition to the created list conservatively.
                    // Some metastore implementations are known to violate the "all or none" guarantee for add_partitions call.
                    boolean batchCompletelyAdded = true;
                    for (PartitionWithStatistics partition : batch) {
                        try {
                            Optional<Partition> remotePartition = metastore.getPartition(metastoreContext, schemaName, tableName, partition.getPartition().getValues());
                            // getPrestoQueryId(partition) is guaranteed to be non-empty. It is asserted in PartitionAdder.addPartition.
                            if (remotePartition.isPresent() && getPrestoQueryId(remotePartition.get()).equals(getPrestoQueryId(partition.getPartition()))) {
                                createdPartitionValues.add(partition.getPartition().getValues());
                            }
                            else {
                                batchCompletelyAdded = false;
                            }
                        }
                        catch (Throwable ignored) {
                            // When partition could not be fetched from metastore, it is not known whether the partition was added.
                            // Deleting the partition when aborting commit has the risk of deleting partition not added in this transaction.
                            // Not deleting the partition may leave garbage behind. The former is much more dangerous than the latter.
                            // Therefore, the partition is not added to the createdPartitionValues list here.
                            batchCompletelyAdded = false;
                        }
                    }
                    // If all the partitions were added successfully, the add_partition operation was actually successful.
                    // For some reason, it threw an exception (communication failure, retry failure after communication failure, etc).
                    // But we would consider it successful anyways.
                    if (!batchCompletelyAdded) {
                        if (t instanceof TableNotFoundException) {
                            throw new PrestoException(HIVE_TABLE_DROPPED_DURING_QUERY, t);
                        }
                        throw t;
                    }
                }
            }
            partitions.clear();
        }

        public List<List<String>> rollback()
        {
            // drop created partitions
            List<List<String>> partitionsFailedToRollback = new ArrayList<>();
            for (List<String> createdPartitionValue : createdPartitionValues) {
                try {
                    metastore.dropPartition(metastoreContext, schemaName, tableName, createdPartitionValue, false);
                }
                catch (PartitionNotFoundException e) {
                    // Maybe some one deleted the partition we added.
                    // Anyways, we are good because the partition is not there anymore.
                }
                catch (Throwable t) {
                    partitionsFailedToRollback.add(createdPartitionValue);
                }
            }
            operationResults = ImmutableList.of();
            createdPartitionValues = partitionsFailedToRollback;
            return partitionsFailedToRollback;
        }
    }

    private static class RecursiveDeleteResult
    {
        private final boolean directoryNoLongerExists;
        private final List<String> notDeletedEligibleItems;

        public RecursiveDeleteResult(boolean directoryNoLongerExists, List<String> notDeletedEligibleItems)
        {
            this.directoryNoLongerExists = directoryNoLongerExists;
            this.notDeletedEligibleItems = notDeletedEligibleItems;
        }

        public boolean isDirectoryNoLongerExists()
        {
            return directoryNoLongerExists;
        }

        public List<String> getNotDeletedEligibleItems()
        {
            return notDeletedEligibleItems;
        }
    }

    private interface ExclusiveOperation
    {
        ConnectorCommitHandle execute(ExtendedHiveMetastore delegate, HdfsEnvironment hdfsEnvironment);
    }
}