TestIcebergDistributedHive.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg.hive;
import com.facebook.presto.Session;
import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.common.transaction.TransactionId;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.iceberg.IcebergCatalogName;
import com.facebook.presto.iceberg.IcebergDistributedTestBase;
import com.facebook.presto.iceberg.IcebergHiveMetadata;
import com.facebook.presto.iceberg.IcebergHiveTableOperationsConfig;
import com.facebook.presto.iceberg.IcebergUtil;
import com.facebook.presto.iceberg.ManifestFileCache;
import com.facebook.presto.metadata.CatalogManager;
import com.facebook.presto.metadata.CatalogMetadata;
import com.facebook.presto.metadata.MetadataUtil;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.TableHandle;
import com.facebook.presto.spi.connector.classloader.ClassLoaderSafeConnectorMetadata;
import com.google.common.base.Joiner;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheStats;
import com.google.common.collect.ImmutableMap;
import org.apache.iceberg.Table;
import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import static com.facebook.presto.hive.metastore.InMemoryCachingHiveMetastore.memoizeMetastore;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.IcebergQueryRunner.ICEBERG_CATALOG;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.NUMBER_OF_DISTINCT_VALUES;
import static com.facebook.presto.spi.statistics.ColumnStatisticType.TOTAL_SIZE_IN_BYTES;
import static java.lang.String.format;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@Test
public class TestIcebergDistributedHive
extends IcebergDistributedTestBase
{
public TestIcebergDistributedHive()
{
super(HIVE, ImmutableMap.of("iceberg.hive-statistics-merge-strategy", Joiner.on(",").join(NUMBER_OF_DISTINCT_VALUES.name(), TOTAL_SIZE_IN_BYTES.name())));
}
@Override
public void testNDVsAtSnapshot()
{
// ignore because HMS doesn't support statistics versioning
}
@Override
public void testStatsByDistance()
{
// ignore because HMS doesn't support statistics versioning
}
@Override
public void testPartShowStatsWithFilters()
{
// Hive doesn't support returning statistics on partitioned tables
}
@Override
public void testStatisticsFileCache()
throws Exception
{
// hive doesn't write Iceberg statistics files when metastore is in use,
// so this test won't complete successfully.
}
@Test
public void testManifestFileCaching()
throws Exception
{
String catalogName = "iceberg_manifest_caching";
Map<String, String> catalogProperties = new HashMap<>(this.icebergQueryRunner.getIcebergCatalogs().get("iceberg"));
catalogProperties.put("iceberg.io.manifest.cache-enabled", "true");
getQueryRunner().createCatalog(catalogName, "iceberg", catalogProperties);
Session session = Session.builder(getSession())
.setCatalog(catalogName)
.setSchema("default")
.build();
assertQuerySucceeds(session, "CREATE SCHEMA IF NOT EXISTS default");
assertQuerySucceeds(session, "CREATE TABLE test_manifest_file_cache(i int)");
TransactionId transactionId = getQueryRunner().getTransactionManager().beginTransaction(false);
Session txnSession = Session.builder(session)
.setTransactionId(transactionId)
.build();
Optional<TableHandle> handle = MetadataUtil.getOptionalTableHandle(txnSession,
getQueryRunner().getTransactionManager(),
QualifiedObjectName.valueOf(txnSession.getCatalog().get(), txnSession.getSchema().get(), "test_manifest_file_cache"),
Optional.empty());
CatalogMetadata catalogMetadata = getQueryRunner().getTransactionManager()
.getCatalogMetadata(txnSession.getTransactionId().get(), handle.get().getConnectorId());
Field delegate = ClassLoaderSafeConnectorMetadata.class.getDeclaredField("delegate");
delegate.setAccessible(true);
IcebergHiveMetadata metadata = (IcebergHiveMetadata) delegate.get(catalogMetadata.getMetadataFor(handle.get().getConnectorId()));
ManifestFileCache manifestFileCache = metadata.getManifestFileCache();
assertUpdate(session, "INSERT INTO test_manifest_file_cache VALUES 1, 2, 3, 4, 5", 5);
manifestFileCache.invalidateAll();
assertEquals(manifestFileCache.size(), 0);
CacheStats initial = manifestFileCache.stats();
assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache group by i");
CacheStats firstQuery = manifestFileCache.stats();
assertTrue(firstQuery.minus(initial).missCount() > 0);
assertTrue(manifestFileCache.size() > 0);
assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache group by i");
CacheStats secondQuery = manifestFileCache.stats();
assertEquals(secondQuery.minus(firstQuery).missCount(), 0);
assertTrue(secondQuery.minus(firstQuery).hitCount() > 0);
assertTrue(manifestFileCache.size() > 0);
//test invalidate_manifest_file_cache procedure
assertQuerySucceeds(session, format("CALL %s.system.invalidate_manifest_file_cache()", catalogName));
assertTrue(manifestFileCache.size() == 0);
assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache group by i");
CacheStats thirdQuery = manifestFileCache.stats();
assertTrue(secondQuery.missCount() < thirdQuery.missCount());
getQueryRunner().getTransactionManager().asyncAbort(transactionId);
assertQuerySucceeds(session, "DROP TABLE test_manifest_file_cache");
assertQuerySucceeds(session, "DROP SCHEMA default");
}
@Test
public void testManifestFileCachingDisabled()
throws Exception
{
String catalogName = "iceberg_no_manifest_caching";
Map<String, String> catalogProperties = new HashMap<>(this.icebergQueryRunner.getIcebergCatalogs().get("iceberg"));
catalogProperties.put("iceberg.io.manifest.cache-enabled", "false");
getQueryRunner().createCatalog(catalogName, "iceberg", catalogProperties);
Session session = Session.builder(getSession())
.setCatalog(catalogName)
.setSchema("default")
.build();
assertQuerySucceeds(session, "CREATE SCHEMA IF NOT EXISTS default");
assertQuerySucceeds(session, "CREATE TABLE test_manifest_file_cache_disabled(i int)");
assertUpdate(session, "INSERT INTO test_manifest_file_cache_disabled VALUES 1, 2, 3, 4, 5", 5);
TransactionId transactionId = getQueryRunner().getTransactionManager().beginTransaction(false);
Session metadataSession = Session.builder(session)
.setTransactionId(transactionId)
.build();
Optional<TableHandle> handle = MetadataUtil.getOptionalTableHandle(metadataSession,
getQueryRunner().getTransactionManager(),
QualifiedObjectName.valueOf(metadataSession.getCatalog().get(), metadataSession.getSchema().get(), "test_manifest_file_cache_disabled"),
Optional.empty());
CatalogMetadata catalogMetadata = getQueryRunner().getTransactionManager()
.getCatalogMetadata(metadataSession.getTransactionId().get(), handle.get().getConnectorId());
Field delegate = ClassLoaderSafeConnectorMetadata.class.getDeclaredField("delegate");
delegate.setAccessible(true);
IcebergHiveMetadata metadata = (IcebergHiveMetadata) delegate.get(catalogMetadata.getMetadataFor(handle.get().getConnectorId()));
ManifestFileCache manifestFileCache = metadata.getManifestFileCache();
assertFalse(manifestFileCache.isEnabled());
CacheStats initial = manifestFileCache.stats();
assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache_disabled group by i");
CacheStats firstQuery = manifestFileCache.stats();
assertEquals(firstQuery.minus(initial).hitCount(), 0);
assertEquals(manifestFileCache.size(), 0);
assertQuerySucceeds(session, "SELECT count(*) from test_manifest_file_cache_disabled group by i");
CacheStats secondQuery = manifestFileCache.stats();
assertEquals(secondQuery.minus(firstQuery).hitCount(), 0);
assertEquals(manifestFileCache.size(), 0);
getQueryRunner().getTransactionManager().asyncAbort(transactionId);
assertQuerySucceeds(session, "DROP TABLE test_manifest_file_cache_disabled");
assertQuerySucceeds(session, "DROP SCHEMA default");
}
@Override
protected Table loadTable(String tableName)
{
tableName = normalizeIdentifier(tableName, ICEBERG_CATALOG);
CatalogManager catalogManager = getDistributedQueryRunner().getCoordinator().getCatalogManager();
ConnectorId connectorId = catalogManager.getCatalog(ICEBERG_CATALOG).get().getConnectorId();
return IcebergUtil.getHiveIcebergTable(getFileHiveMetastore(),
getHdfsEnvironment(),
new IcebergHiveTableOperationsConfig(),
new ManifestFileCache(CacheBuilder.newBuilder().build(), false, 0, 1024 * 1024),
getQueryRunner().getDefaultSession().toConnectorSession(connectorId),
new IcebergCatalogName(ICEBERG_CATALOG),
SchemaTableName.valueOf("tpch." + tableName));
}
protected ExtendedHiveMetastore getFileHiveMetastore()
{
IcebergFileHiveMetastore fileHiveMetastore = new IcebergFileHiveMetastore(getHdfsEnvironment(),
getCatalogDirectory().toString(),
"test");
return memoizeMetastore(fileHiveMetastore, false, 1000, 0);
}
}