TestingExtendedHiveMetastore.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.hive.metastore.Database;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.MetastoreOperationResult;
import com.facebook.presto.hive.metastore.Partition;
import com.facebook.presto.hive.metastore.PartitionStatistics;
import com.facebook.presto.hive.metastore.PartitionWithStatistics;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.hive.metastore.UnimplementedHiveMetastore;
import com.facebook.presto.spi.constraints.TableConstraint;
import com.facebook.presto.spi.security.PrincipalType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;

import static com.facebook.presto.hive.metastore.MetastoreUtil.toPartitionValues;

public class TestingExtendedHiveMetastore
        extends UnimplementedHiveMetastore
{
    private final Map<String, Long> lastDataCommitTimes = new HashMap<>();
    private final Map<String, Table> tables = new HashMap<>();
    private final Map<String, Partition> partitions = new HashMap<>();

    @Override
    public List<String> getAllDatabases(MetastoreContext metastoreContext)
    {
        return ImmutableList.of("hive_test");
    }

    @Override
    public Optional<Database> getDatabase(MetastoreContext metastoreContext, String databaseName)
    {
        return Optional.of(new Database(databaseName, Optional.of("/"), "test_owner", PrincipalType.USER, Optional.empty(), ImmutableMap.of(), Optional.of("testcatalog")));
    }

    @Override
    public MetastoreOperationResult createTable(MetastoreContext metastoreContext, Table table, PrincipalPrivileges principalPrivileges, List<TableConstraint<String>> constraints)
    {
        String tableKey = createTableKey(table.getDatabaseName(), table.getTableName());
        tables.put(tableKey, table);
        long currentTime = System.currentTimeMillis() / 1000;
        lastDataCommitTimes.put(tableKey, currentTime);

        return new MetastoreOperationResult(ImmutableList.of(currentTime));
    }

    @Override
    public Optional<Table> getTable(MetastoreContext metastoreContext, String databaseName, String tableName)
    {
        String tableKey = createTableKey(databaseName, tableName);
        return Optional.ofNullable(tables.get(tableKey));
    }

    @Override
    public void dropTable(MetastoreContext metastoreContext, String databaseName, String tableName, boolean deleteData)
    {
        String tableKey = createTableKey(databaseName, tableName);
        lastDataCommitTimes.remove(tableKey);
        tables.remove(tableKey);
    }

    @Override
    public void updateTableStatistics(MetastoreContext metastoreContext, String databaseName, String tableName, Function<PartitionStatistics, PartitionStatistics> update)
    {
    }

    @Override
    public MetastoreOperationResult addPartitions(MetastoreContext metastoreContext, String databaseName, String tableName, List<PartitionWithStatistics> partitions)
    {
        List<Long> times = new ArrayList<>();
        for (PartitionWithStatistics partition : partitions) {
            String partitionKey = createPartitionKey(databaseName, tableName, partition.getPartitionName());
            Partition oldPartition = this.partitions.put(partitionKey, partition.getPartition());

            if (oldPartition != null) {
                String oldLocation = oldPartition.getStorage().getLocation();
                String newLocation = partition.getPartition().getStorage().getLocation();

                // Use old data commit time if the location does not change.
                if (oldLocation.equals(newLocation)) {
                    times.add(lastDataCommitTimes.get(partitionKey));
                }
                else {
                    // Update the data commit time if the partition location changes.
                    // Adding 1000 to ensure their times are different since their data commit times are compared in seconds.
                    long currentTime = System.currentTimeMillis() / 1000;
                    lastDataCommitTimes.put(partitionKey, currentTime);
                    times.add(currentTime);
                }
            }
            else {
                // If the partition is new, add the data commit time.
                long currentTime = System.currentTimeMillis() / 1000;
                lastDataCommitTimes.put(partitionKey, currentTime);
                times.add(currentTime);
            }
        }
        return new MetastoreOperationResult(times);
    }

    @Override
    public MetastoreOperationResult alterPartition(MetastoreContext metastoreContext, String databaseName, String tableName, PartitionWithStatistics partition)
    {
        String partitionKey = createPartitionKey(databaseName, tableName, partition.getPartitionName());
        Partition oldPartition = partitions.get(partitionKey);
        partitions.put(partitionKey, partition.getPartition());

        // When its location changes, we should generate a new commit time.
        if (oldPartition != null && oldPartition.getStorage().getLocation().equals(partition.getPartition().getStorage().getLocation())) {
            lastDataCommitTimes.put(partitionKey, System.currentTimeMillis() / 1000);
        }

        if (!lastDataCommitTimes.containsKey(partitionKey)) {
            return new MetastoreOperationResult(ImmutableList.of());
        }
        return new MetastoreOperationResult(ImmutableList.of(lastDataCommitTimes.get(partitionKey)));
    }

    @Override
    public Optional<Partition> getPartition(MetastoreContext metastoreContext, String databaseName, String tableName, List<String> partitionValues)
    {
        String partitionKey = createPartitionKey(databaseName, tableName, partitionValues);
        long time = lastDataCommitTimes.getOrDefault(partitionKey, 0L);

        Partition partition = partitions.get(partitionKey);
        if (partition != null) {
            Partition.Builder builder = Partition.builder(partition)
                    .setLastDataCommitTime(time);
            return Optional.ofNullable(builder.build());
        }
        return Optional.empty();
    }

    @Override
    public Map<String, Optional<Partition>> getPartitionsByNames(MetastoreContext metastoreContext, String databaseName, String tableName, List<PartitionNameWithVersion> partitionNamesWithVersion)
    {
        Map<String, Optional<Partition>> result = new HashMap<>();
        for (PartitionNameWithVersion partitionNameWithVersion : partitionNamesWithVersion) {
            List<String> partitionValues = toPartitionValues(partitionNameWithVersion.getPartitionName());
            String partitionKey = createPartitionKey(databaseName, tableName, partitionValues);
            long time = lastDataCommitTimes.getOrDefault(partitionKey, 0L);

            Partition partition = partitions.get(partitionKey);
            if (partition != null) {
                Partition.Builder builder = Partition.builder(partition)
                        .setLastDataCommitTime(time);
                result.put(partitionNameWithVersion.getPartitionName(), Optional.of(builder.build()));
            }
            else {
                result.put(partitionNameWithVersion.getPartitionName(), Optional.empty());
            }
        }
        return result;
    }

    private String createPartitionKey(String databaseName, String tableName, String partitionName)
    {
        List<String> partitionValues = toPartitionValues(partitionName);

        return String.join(".", databaseName, tableName, partitionValues.toString());
    }

    private String createPartitionKey(String databaseName, String tableName, List<String> partitionValues)
    {
        return String.join(".", databaseName, tableName, partitionValues.toString());
    }

    private String createTableKey(String databaseName, String tableName)
    {
        return String.join(".", databaseName, tableName);
    }
}