TestInMemoryCachingHiveMetastore.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive.metastore;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.MockHiveMetastore;
import com.facebook.presto.hive.PartitionMutator;
import com.facebook.presto.hive.PartitionNameWithVersion;
import com.facebook.presto.hive.metastore.AbstractCachingHiveMetastore.MetastoreCacheScope;
import com.facebook.presto.hive.metastore.thrift.BridgingHiveMetastore;
import com.facebook.presto.hive.metastore.thrift.HiveCluster;
import com.facebook.presto.hive.metastore.thrift.HiveMetastoreClient;
import com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient;
import com.facebook.presto.hive.metastore.thrift.ThriftHiveMetastore;
import com.facebook.presto.hive.metastore.thrift.ThriftHiveMetastoreStats;
import com.facebook.presto.spi.constraints.NotNullConstraint;
import com.facebook.presto.spi.constraints.PrimaryKeyConstraint;
import com.facebook.presto.spi.constraints.TableConstraint;
import com.facebook.presto.spi.constraints.UniqueConstraint;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.units.Duration;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.presto.hive.HiveTestUtils.HDFS_ENVIRONMENT;
import static com.facebook.presto.hive.metastore.NoopMetastoreCacheStats.NOOP_METASTORE_CACHE_STATS;
import static com.facebook.presto.hive.metastore.Partition.Builder;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.BAD_DATABASE;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.PARTITION_VERSION;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.TEST_DATABASE;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.TEST_METASTORE_CONTEXT;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.TEST_PARTITION1;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.TEST_PARTITION2;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.TEST_PARTITION_NAME_WITHOUT_VERSION1;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.TEST_PARTITION_NAME_WITHOUT_VERSION2;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.TEST_PARTITION_NAME_WITH_VERSION1;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.TEST_PARTITION_NAME_WITH_VERSION2;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.TEST_PARTITION_VALUES1;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.TEST_PARTITION_VALUES2;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.TEST_ROLES;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.TEST_TABLE;
import static com.facebook.presto.hive.metastore.thrift.MockHiveMetastoreClient.TEST_TABLE_WITH_CONSTRAINTS;
import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.function.UnaryOperator.identity;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
@Test(singleThreaded = true)
public class TestInMemoryCachingHiveMetastore
{
private static final ImmutableList<PartitionNameWithVersion> EXPECTED_PARTITIONS = ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1, TEST_PARTITION_NAME_WITH_VERSION2);
private MockHiveMetastoreClient mockClient;
private InMemoryCachingHiveMetastore metastore;
private ThriftHiveMetastoreStats stats;
@BeforeMethod
public void setUp()
{
mockClient = new MockHiveMetastoreClient();
MockHiveCluster mockHiveCluster = new MockHiveCluster(mockClient);
ListeningExecutorService executor = listeningDecorator(newCachedThreadPool(daemonThreadsNamed("test-%s")));
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
ThriftHiveMetastore thriftHiveMetastore = new ThriftHiveMetastore(mockHiveCluster, metastoreClientConfig, HDFS_ENVIRONMENT);
PartitionMutator hivePartitionMutator = new HivePartitionMutator();
metastore = new InMemoryCachingHiveMetastore(
new BridgingHiveMetastore(thriftHiveMetastore, hivePartitionMutator),
executor,
false,
new Duration(5, TimeUnit.MINUTES),
new Duration(1, TimeUnit.MINUTES),
1000,
false,
MetastoreCacheScope.ALL,
0.0,
metastoreClientConfig.getPartitionCacheColumnCountLimit(),
NOOP_METASTORE_CACHE_STATS);
stats = thriftHiveMetastore.getStats();
}
@Test
public void testGetAllDatabases()
{
assertEquals(mockClient.getAccessCount(), 0);
assertEquals(metastore.getAllDatabases(TEST_METASTORE_CONTEXT), ImmutableList.of(TEST_DATABASE));
assertEquals(mockClient.getAccessCount(), 1);
assertEquals(metastore.getAllDatabases(TEST_METASTORE_CONTEXT), ImmutableList.of(TEST_DATABASE));
assertEquals(mockClient.getAccessCount(), 1);
metastore.invalidateAll();
assertEquals(metastore.getAllDatabases(TEST_METASTORE_CONTEXT), ImmutableList.of(TEST_DATABASE));
assertEquals(mockClient.getAccessCount(), 2);
// Test invalidate a specific database
metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE);
assertEquals(metastore.getAllDatabases(TEST_METASTORE_CONTEXT), ImmutableList.of(TEST_DATABASE));
assertEquals(mockClient.getAccessCount(), 3);
}
@Test
public void testGetAllTable()
{
assertEquals(mockClient.getAccessCount(), 0);
assertEquals(metastore.getAllTables(TEST_METASTORE_CONTEXT, TEST_DATABASE).get(), ImmutableList.of(TEST_TABLE, TEST_TABLE_WITH_CONSTRAINTS));
assertEquals(mockClient.getAccessCount(), 1);
assertEquals(metastore.getAllTables(TEST_METASTORE_CONTEXT, TEST_DATABASE).get(), ImmutableList.of(TEST_TABLE, TEST_TABLE_WITH_CONSTRAINTS));
assertEquals(mockClient.getAccessCount(), 1);
metastore.invalidateAll();
assertEquals(metastore.getAllTables(TEST_METASTORE_CONTEXT, TEST_DATABASE).get(), ImmutableList.of(TEST_TABLE, TEST_TABLE_WITH_CONSTRAINTS));
assertEquals(mockClient.getAccessCount(), 2);
// Test invalidate a specific database which will also invalidate all table caches mapped to that database
metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE);
assertEquals(metastore.getAllTables(TEST_METASTORE_CONTEXT, TEST_DATABASE).get(), ImmutableList.of(TEST_TABLE, TEST_TABLE_WITH_CONSTRAINTS));
assertEquals(mockClient.getAccessCount(), 3);
assertEquals(metastore.getAllTables(TEST_METASTORE_CONTEXT, TEST_DATABASE).get(), ImmutableList.of(TEST_TABLE, TEST_TABLE_WITH_CONSTRAINTS));
assertEquals(mockClient.getAccessCount(), 3);
// Test invalidate a specific database.table which also invalidates the tablesNamesCache for that database
metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE);
assertEquals(metastore.getAllTables(TEST_METASTORE_CONTEXT, TEST_DATABASE).get(), ImmutableList.of(TEST_TABLE, TEST_TABLE_WITH_CONSTRAINTS));
assertEquals(mockClient.getAccessCount(), 4);
}
public void testInvalidDbGetAllTAbles()
{
assertFalse(metastore.getAllTables(TEST_METASTORE_CONTEXT, BAD_DATABASE).isPresent());
}
@Test
public void testGetTable()
{
assertEquals(mockClient.getAccessCount(), 0);
assertNotNull(metastore.getTable(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE));
assertEquals(mockClient.getAccessCount(), 1);
assertNotNull(metastore.getTable(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE));
assertEquals(mockClient.getAccessCount(), 1);
metastore.invalidateAll();
assertNotNull(metastore.getTable(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE));
assertEquals(mockClient.getAccessCount(), 2);
// Test invalidate a specific database which will also invalidate all table caches mapped to that database
metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE);
assertNotNull(metastore.getTable(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE));
assertEquals(mockClient.getAccessCount(), 3);
assertNotNull(metastore.getTable(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE));
assertEquals(mockClient.getAccessCount(), 3);
assertNotNull(metastore.getTable(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE_WITH_CONSTRAINTS));
assertEquals(mockClient.getAccessCount(), 4);
// Test invalidate a specific table
metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE);
assertNotNull(metastore.getTable(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE_WITH_CONSTRAINTS));
assertEquals(mockClient.getAccessCount(), 4);
assertNotNull(metastore.getTable(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE));
assertEquals(mockClient.getAccessCount(), 5);
}
public void testInvalidDbGetTable()
{
assertFalse(metastore.getTable(TEST_METASTORE_CONTEXT, BAD_DATABASE, TEST_TABLE).isPresent());
assertEquals(stats.getGetTable().getThriftExceptions().getTotalCount(), 0);
assertEquals(stats.getGetTable().getTotalFailures().getTotalCount(), 0);
assertNotNull(stats.getGetTable().getTime());
}
@Test
public void testGetPartitionNames()
{
ImmutableList<PartitionNameWithVersion> expectedPartitions = ImmutableList.of(TEST_PARTITION_NAME_WITHOUT_VERSION1, TEST_PARTITION_NAME_WITHOUT_VERSION2);
assertEquals(mockClient.getAccessCount(), 0);
assertEquals(metastore.getPartitionNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE).get(), expectedPartitions);
assertEquals(mockClient.getAccessCount(), 1);
assertEquals(metastore.getPartitionNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE).get(), expectedPartitions);
assertEquals(mockClient.getAccessCount(), 1);
metastore.invalidateAll();
assertEquals(metastore.getPartitionNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE).get(), expectedPartitions);
assertEquals(mockClient.getAccessCount(), 2);
// Test invalidate the database which will also invalidate all linked table and partition caches
metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE);
assertEquals(metastore.getPartitionNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE).get(), expectedPartitions);
assertEquals(mockClient.getAccessCount(), 3);
assertEquals(metastore.getPartitionNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE).get(), expectedPartitions);
assertEquals(mockClient.getAccessCount(), 3);
// Test invalidate a specific table which will also invalidate all linked partition caches
metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE);
assertEquals(metastore.getPartitionNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE).get(), expectedPartitions);
assertEquals(mockClient.getAccessCount(), 4);
assertEquals(metastore.getPartitionNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE).get(), expectedPartitions);
assertEquals(mockClient.getAccessCount(), 4);
// Test invalidate a specific partition
metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of("key"), ImmutableList.of("testpartition1"));
assertEquals(metastore.getPartitionNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE).get(), expectedPartitions);
assertEquals(mockClient.getAccessCount(), 5);
}
@Test
public void testInvalidInvalidateCache()
{
// Test invalidate cache with null/empty database name
assertThatThrownBy(() -> metastore.invalidateCache(TEST_METASTORE_CONTEXT, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("databaseName cannot be null or empty");
// Test invalidate cache with null/empty table name
assertThatThrownBy(() -> metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE, null))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("tableName cannot be null or empty");
// Test invalidate cache with invalid/empty partition columns list
assertThatThrownBy(() -> metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(), ImmutableList.of()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("partitionColumnNames cannot be null or empty");
// Test invalidate cache with invalid/empty partition values list
assertThatThrownBy(() -> metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of("key"), ImmutableList.of()))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("partitionValues cannot be null or empty");
// Test invalidate cache with mismatched partition columns and values list
assertThatThrownBy(() -> metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of("key1", "key2"), ImmutableList.of("testpartition1")))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("partitionColumnNames and partitionValues should be of same length");
}
@Test
public void testInvalidGetPartitionNames()
{
assertEquals(metastore.getPartitionNames(TEST_METASTORE_CONTEXT, BAD_DATABASE, TEST_TABLE).get(), ImmutableList.of());
}
@Test
public void testGetPartitionNamesByParts()
{
ImmutableList<PartitionNameWithVersion> expectedPartitions = ImmutableList.of(TEST_PARTITION_NAME_WITHOUT_VERSION1, TEST_PARTITION_NAME_WITHOUT_VERSION2);
assertEquals(mockClient.getAccessCount(), 0);
assertEquals(metastore.getPartitionNamesByFilter(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableMap.of()), expectedPartitions);
assertEquals(mockClient.getAccessCount(), 1);
assertEquals(metastore.getPartitionNamesByFilter(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableMap.of()), expectedPartitions);
assertEquals(mockClient.getAccessCount(), 1);
metastore.invalidateAll();
assertEquals(metastore.getPartitionNamesByFilter(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableMap.of()), expectedPartitions);
assertEquals(mockClient.getAccessCount(), 2);
}
private static class MockPartitionMutator
implements PartitionMutator
{
private final Function<Long, Long> versionUpdater;
private long version = PARTITION_VERSION;
public MockPartitionMutator(Function<Long, Long> versionUpdater)
{
this.versionUpdater = versionUpdater;
}
@Override
public void mutate(Builder builder, org.apache.hadoop.hive.metastore.api.Partition partition)
{
version = versionUpdater.apply(version);
builder.setPartitionVersion(version);
}
}
@Test
public void testCachingWithPartitionVersioning()
{
MockHiveMetastoreClient mockClient = new MockHiveMetastoreClient();
MockHiveCluster mockHiveCluster = new MockHiveCluster(mockClient);
ListeningExecutorService executor = listeningDecorator(newCachedThreadPool(daemonThreadsNamed("partition-versioning-test-%s")));
MockHiveMetastore mockHiveMetastore = new MockHiveMetastore(mockHiveCluster);
PartitionMutator mockPartitionMutator = new MockPartitionMutator(identity());
InMemoryCachingHiveMetastore partitionCachingEnabledmetastore = new InMemoryCachingHiveMetastore(
new BridgingHiveMetastore(mockHiveMetastore, mockPartitionMutator),
executor,
false,
new Duration(5, TimeUnit.MINUTES),
new Duration(1, TimeUnit.MINUTES),
1000,
true,
MetastoreCacheScope.PARTITION,
0.0,
10_000,
NOOP_METASTORE_CACHE_STATS);
assertEquals(mockClient.getAccessCount(), 0);
assertEquals(partitionCachingEnabledmetastore.getPartitionNamesByFilter(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableMap.of()), EXPECTED_PARTITIONS);
assertEquals(mockClient.getAccessCount(), 1);
assertEquals(partitionCachingEnabledmetastore.getPartitionNamesByFilter(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableMap.of()), EXPECTED_PARTITIONS);
// Assert that we did not hit cache
assertEquals(mockClient.getAccessCount(), 2);
// Select all of the available partitions and load them into the cache
assertEquals(partitionCachingEnabledmetastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1, TEST_PARTITION_NAME_WITH_VERSION2)).size(), 2);
assertEquals(mockClient.getAccessCount(), 3);
// Now if we fetch any or both of them, they should hit the cache
assertEquals(partitionCachingEnabledmetastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1)).size(), 1);
assertEquals(partitionCachingEnabledmetastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION2)).size(), 1);
assertEquals(partitionCachingEnabledmetastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1, TEST_PARTITION_NAME_WITH_VERSION2)).size(), 2);
assertEquals(mockClient.getAccessCount(), 3);
// This call should NOT invalidate the partition cache because partition version is same as before
assertEquals(partitionCachingEnabledmetastore.getPartitionNamesByFilter(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableMap.of()), EXPECTED_PARTITIONS);
assertEquals(mockClient.getAccessCount(), 4);
assertEquals(partitionCachingEnabledmetastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1, TEST_PARTITION_NAME_WITH_VERSION2)).size(), 2);
// Assert that its a cache hit
assertEquals(mockClient.getAccessCount(), 4);
Function<Long, Long> versionIncrement = version -> version + 1;
Function<Long, Long> versionDecrement = version -> version + 1;
assertInvalidateCache(new MockPartitionMutator(versionIncrement), versionIncrement);
assertInvalidateCache(new MockPartitionMutator(versionDecrement), versionDecrement);
}
private void assertInvalidateCache(MockPartitionMutator partitionMutator, Function<Long, Long> versionMutator)
{
MockHiveMetastoreClient mockClient = new MockHiveMetastoreClient();
MockHiveCluster mockHiveCluster = new MockHiveCluster(mockClient);
ListeningExecutorService executor = listeningDecorator(newCachedThreadPool(daemonThreadsNamed("partition-versioning-test-%s")));
MockHiveMetastore mockHiveMetastore = new MockHiveMetastore(mockHiveCluster);
InMemoryCachingHiveMetastore partitionCachingEnabledmetastore = new InMemoryCachingHiveMetastore(
new BridgingHiveMetastore(mockHiveMetastore, partitionMutator),
executor,
false,
new Duration(5, TimeUnit.MINUTES),
new Duration(1, TimeUnit.MINUTES),
1000,
true,
MetastoreCacheScope.PARTITION,
0.0,
10_000,
NOOP_METASTORE_CACHE_STATS);
int clientAccessCount = 0;
for (int i = 0; i < 100; i++) {
assertEquals(partitionCachingEnabledmetastore.getPartitionNamesByFilter(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableMap.of()), EXPECTED_PARTITIONS);
assertEquals(mockClient.getAccessCount(), ++clientAccessCount);
PartitionNameWithVersion partitionNameWithVersion1 = new PartitionNameWithVersion(TEST_PARTITION1, Optional.of(versionMutator.apply(PARTITION_VERSION + (long) (i - 1))));
PartitionNameWithVersion partitionNameWithVersion2 = new PartitionNameWithVersion(TEST_PARTITION2, Optional.of(versionMutator.apply(PARTITION_VERSION + (long) (i - 1))));
assertEquals(partitionCachingEnabledmetastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(partitionNameWithVersion1, partitionNameWithVersion2)).size(), 2);
// Assert that we did not hit cache
assertEquals(mockClient.getAccessCount(), ++clientAccessCount);
}
}
public void testInvalidGetPartitionNamesByParts()
{
assertTrue(metastore.getPartitionNamesByFilter(TEST_METASTORE_CONTEXT, BAD_DATABASE, TEST_TABLE, ImmutableMap.of()).isEmpty());
}
@Test
public void testPartitionCacheValidation()
{
MockHiveMetastoreClient mockClient = new MockHiveMetastoreClient();
MockHiveCluster mockHiveCluster = new MockHiveCluster(mockClient);
ListeningExecutorService executor = listeningDecorator(newCachedThreadPool(daemonThreadsNamed("partition-versioning-test-%s")));
MockHiveMetastore mockHiveMetastore = new MockHiveMetastore(mockHiveCluster);
PartitionMutator mockPartitionMutator = new MockPartitionMutator(identity());
InMemoryCachingHiveMetastore partitionCacheVerificationEnabledMetastore = new InMemoryCachingHiveMetastore(
new BridgingHiveMetastore(mockHiveMetastore, mockPartitionMutator),
executor,
false,
new Duration(5, TimeUnit.MINUTES),
new Duration(1, TimeUnit.MINUTES),
1000,
true,
MetastoreCacheScope.PARTITION,
100.0,
10_000,
NOOP_METASTORE_CACHE_STATS);
// Warmup the cache
partitionCacheVerificationEnabledMetastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1, TEST_PARTITION_NAME_WITH_VERSION2));
// Each of the following calls will be a cache hit and verification will be done.
partitionCacheVerificationEnabledMetastore.getPartition(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, TEST_PARTITION_VALUES1);
partitionCacheVerificationEnabledMetastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1));
partitionCacheVerificationEnabledMetastore.getPartition(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, TEST_PARTITION_VALUES2);
partitionCacheVerificationEnabledMetastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1));
}
@Test
public void testPartitionCacheColumnCountLimit()
{
MockHiveMetastoreClient mockClient = new MockHiveMetastoreClient();
MockHiveCluster mockHiveCluster = new MockHiveCluster(mockClient);
ListeningExecutorService executor = listeningDecorator(newCachedThreadPool(daemonThreadsNamed("partition-versioning-test-%s")));
MockHiveMetastore mockHiveMetastore = new MockHiveMetastore(mockHiveCluster);
PartitionMutator mockPartitionMutator = new MockPartitionMutator(identity());
InMemoryCachingHiveMetastore partitionCachingEnabledMetastore = new InMemoryCachingHiveMetastore(
new BridgingHiveMetastore(mockHiveMetastore, mockPartitionMutator),
executor,
false,
new Duration(5, TimeUnit.MINUTES),
new Duration(1, TimeUnit.MINUTES),
1000,
true,
MetastoreCacheScope.PARTITION,
0.0,
// set the cached partition column count limit as 1 for testing purpose
1,
NOOP_METASTORE_CACHE_STATS);
// Select all of the available partitions. Normally they would have been loaded into the cache. But because of column count limit, they will not be cached
assertEquals(partitionCachingEnabledMetastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1, TEST_PARTITION_NAME_WITH_VERSION2)).size(), 2);
assertEquals(mockClient.getAccessCount(), 1);
assertEquals(partitionCachingEnabledMetastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1)).size(), 1);
// Assert that mockClient is used to fetch data since its a cache miss
assertEquals(mockClient.getAccessCount(), 2);
assertEquals(partitionCachingEnabledMetastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1)).size(), 1);
// Assert that mockClient is used to fetch data since its a cache miss
assertEquals(mockClient.getAccessCount(), 3);
}
@Test
public void testGetPartitionsByNames()
{
assertEquals(mockClient.getAccessCount(), 0);
metastore.getTable(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE);
assertEquals(mockClient.getAccessCount(), 1);
// Select half of the available partitions and load them into the cache
assertEquals(metastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1)).size(), 1);
assertEquals(mockClient.getAccessCount(), 2);
// Now select all of the partitions
assertEquals(metastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1, TEST_PARTITION_NAME_WITH_VERSION2)).size(), 2);
// There should be one more access to fetch the remaining partition
assertEquals(mockClient.getAccessCount(), 3);
// Now if we fetch any or both of them, they should not hit the client
assertEquals(metastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1)).size(), 1);
assertEquals(metastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION2)).size(), 1);
assertEquals(metastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1, TEST_PARTITION_NAME_WITH_VERSION2)).size(), 2);
assertEquals(mockClient.getAccessCount(), 3);
metastore.invalidateAll();
// Fetching both should only result in one batched access
assertEquals(metastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1, TEST_PARTITION_NAME_WITH_VERSION2)).size(), 2);
assertEquals(mockClient.getAccessCount(), 4);
// Test invalidate a specific partition
metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of("key"), ImmutableList.of("testpartition1"));
// This should still be a cache hit
assertEquals(metastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION2)).size(), 1);
assertEquals(mockClient.getAccessCount(), 4);
// This should be a cache miss
assertEquals(metastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1)).size(), 1);
assertEquals(mockClient.getAccessCount(), 5);
// This should be a cache hit
assertEquals(metastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1, TEST_PARTITION_NAME_WITH_VERSION2)).size(), 2);
assertEquals(mockClient.getAccessCount(), 5);
}
@Test
public void testListRoles()
throws Exception
{
assertEquals(mockClient.getAccessCount(), 0);
assertEquals(metastore.listRoles(TEST_METASTORE_CONTEXT), TEST_ROLES);
assertEquals(mockClient.getAccessCount(), 1);
assertEquals(metastore.listRoles(TEST_METASTORE_CONTEXT), TEST_ROLES);
assertEquals(mockClient.getAccessCount(), 1);
metastore.invalidateAll();
assertEquals(metastore.listRoles(TEST_METASTORE_CONTEXT), TEST_ROLES);
assertEquals(mockClient.getAccessCount(), 2);
metastore.createRole(TEST_METASTORE_CONTEXT, "role", "grantor");
assertEquals(metastore.listRoles(TEST_METASTORE_CONTEXT), TEST_ROLES);
assertEquals(mockClient.getAccessCount(), 3);
metastore.dropRole(TEST_METASTORE_CONTEXT, "testrole");
assertEquals(metastore.listRoles(TEST_METASTORE_CONTEXT), TEST_ROLES);
assertEquals(mockClient.getAccessCount(), 4);
}
public void testInvalidGetPartitionsByNames()
{
Map<String, Optional<Partition>> partitionsByNames = metastore.getPartitionsByNames(TEST_METASTORE_CONTEXT, BAD_DATABASE, TEST_TABLE, ImmutableList.of(TEST_PARTITION_NAME_WITH_VERSION1));
assertEquals(partitionsByNames.size(), 1);
Optional<Partition> onlyElement = Iterables.getOnlyElement(partitionsByNames.values());
assertFalse(onlyElement.isPresent());
}
@Test
public void testNoCacheExceptions()
{
// Throw exceptions on usage
mockClient.setThrowException(true);
try {
metastore.getAllDatabases(TEST_METASTORE_CONTEXT);
}
catch (RuntimeException ignored) {
}
assertEquals(mockClient.getAccessCount(), 1);
// Second try should hit the client again
try {
metastore.getAllDatabases(TEST_METASTORE_CONTEXT);
}
catch (RuntimeException ignored) {
}
assertEquals(mockClient.getAccessCount(), 2);
}
@Test
public void testTableConstraints()
{
assertEquals(mockClient.getAccessCount(), 0);
List<TableConstraint<String>> tableConstraints = metastore.getTableConstraints(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE_WITH_CONSTRAINTS);
assertEquals(tableConstraints.get(0), new PrimaryKeyConstraint<>(Optional.of("pk"), new LinkedHashSet<>(ImmutableList.of("c1")), true, true, false));
assertEquals(tableConstraints.get(1), new UniqueConstraint<>(Optional.of("uk"), new LinkedHashSet<>(ImmutableList.of("c2")), true, true, false));
assertEquals(tableConstraints.get(2), new NotNullConstraint<>("c3"));
assertEquals(mockClient.getAccessCount(), 3);
metastore.getTableConstraints(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE_WITH_CONSTRAINTS);
assertEquals(mockClient.getAccessCount(), 3);
metastore.invalidateAll();
metastore.getTableConstraints(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE_WITH_CONSTRAINTS);
assertEquals(mockClient.getAccessCount(), 6);
// Test invalidate TEST_TABLE, which should not affect any entries linked to TEST_TABLE_WITH_CONSTRAINTS
metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE);
metastore.getTableConstraints(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE_WITH_CONSTRAINTS);
assertEquals(mockClient.getAccessCount(), 6);
// Test invalidate TEST_TABLE_WITH_CONSTRAINTS
metastore.invalidateCache(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE_WITH_CONSTRAINTS);
metastore.getTableConstraints(TEST_METASTORE_CONTEXT, TEST_DATABASE, TEST_TABLE_WITH_CONSTRAINTS);
assertEquals(mockClient.getAccessCount(), 9);
}
public static class MockHiveCluster
implements HiveCluster
{
private final MockHiveMetastoreClient client;
private MockHiveCluster(MockHiveMetastoreClient client)
{
this.client = client;
}
@Override
public HiveMetastoreClient createMetastoreClient(Optional<String> token)
{
return client;
}
public MockHiveMetastoreClient createPartitionVersionSupportedMetastoreClient()
{
return client;
}
}
}