AbstractDeltaDistributedQueryTestBase.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.delta;
import com.facebook.presto.Session;
import com.facebook.presto.hive.HivePlugin;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.AbstractTestQueryFramework;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableMap;
import org.testng.ITest;
import org.testng.annotations.AfterClass;
import org.testng.annotations.DataProvider;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.util.Map;
import static com.facebook.presto.common.type.TimeZoneKey.UTC_KEY;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static java.lang.String.format;
import static java.util.Locale.US;
public abstract class AbstractDeltaDistributedQueryTestBase
extends AbstractTestQueryFramework implements ITest
{
public static final String DELTA_CATALOG = "delta";
public static final String HIVE_CATALOG = "hive";
public static final String PATH_SCHEMA = "$path$";
public static final String DELTA_SCHEMA = "deltaTables"; // Schema in Hive which has test Delta tables
protected static final String DELTA_V1 = "delta_v1";
protected static final String DELTA_V3 = "delta_v3";
protected static final String[] DELTA_VERSIONS = {DELTA_V1, DELTA_V3};
/**
* List of tables present in the test resources directory.
*/
private static final String[] DELTA_TEST_TABLE_NAMES_LIST = {
"data-reader-primitives",
"data-reader-array-primitives",
"data-reader-map",
"snapshot-data3",
"checkpointed-delta-table",
"time-travel-partition-changes-b",
"deltatbl-partition-prune",
"data-reader-partition-values",
"data-reader-nested-struct",
"test-lowercase",
"test-partitions-lowercase",
"test-uppercase",
"test-partitions-uppercase",
"test-typing"
};
/**
* List of tables present in the test resources directory. Each table is replicated in reader version 1 and 3
*/
private static final String[] DELTA_TEST_TABLE_LIST =
new String[DELTA_VERSIONS.length * DELTA_TEST_TABLE_NAMES_LIST.length];
static {
for (int i = 0; i < DELTA_VERSIONS.length; i++) {
for (int j = 0; j < DELTA_TEST_TABLE_NAMES_LIST.length; j++) {
DELTA_TEST_TABLE_LIST[i * DELTA_TEST_TABLE_NAMES_LIST.length + j] = DELTA_VERSIONS[i] +
FileSystems.getDefault().getSeparator() + DELTA_TEST_TABLE_NAMES_LIST[j];
}
}
}
private final ThreadLocal<String> testName = new ThreadLocal<>();
@DataProvider
protected static Object[][] deltaReaderVersions()
{
return new Object[][] {{DELTA_V1}, {DELTA_V3}};
}
@Override
public String getTestName()
{
return this.testName.get();
}
protected static String getVersionPrefix(String version)
{
return version + FileSystems.getDefault().getSeparator();
}
@Override
protected QueryRunner createQueryRunner()
throws Exception
{
QueryRunner queryRunner = createDeltaQueryRunner(ImmutableMap.of(
"experimental.pushdown-subfields-enabled", "true",
"experimental.pushdown-dereference-enabled", "true"));
// Create the test Delta tables in HMS
for (String deltaTestTable : DELTA_TEST_TABLE_LIST) {
registerDeltaTableInHMS(queryRunner, deltaTestTable, deltaTestTable);
}
return queryRunner;
}
@AfterClass
public void deleteTestDeltaTables()
{
QueryRunner queryRunner = getQueryRunner();
if (queryRunner != null) {
// Remove the test Delta tables from HMS
for (String deltaTestTable : DELTA_TEST_TABLE_LIST) {
unregisterDeltaTableInHMS(queryRunner, deltaTestTable);
}
}
}
protected static String goldenTablePath(String tableName)
{
return AbstractDeltaDistributedQueryTestBase.class.getClassLoader().getResource(tableName).toString();
}
protected static String goldenTablePathWithPrefix(String prefix, String tableName)
{
return goldenTablePath(prefix + FileSystems.getDefault().getSeparator() + tableName);
}
private static DistributedQueryRunner createDeltaQueryRunner(Map<String, String> extraProperties)
throws Exception
{
Session session = testSessionBuilder()
.setCatalog(DELTA_CATALOG)
.setSchema(DELTA_SCHEMA.toLowerCase(US))
.setTimeZoneKey(UTC_KEY)
.build();
DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session)
.setExtraProperties(extraProperties)
.build();
// Install the TPCH plugin for test data (not in Delta format)
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");
Path dataDirectory = queryRunner.getCoordinator().getDataDirectory().resolve("delta_metadata");
Path catalogDirectory = dataDirectory.getParent().resolve("catalog");
// Install a Delta connector catalog
queryRunner.installPlugin(new DeltaPlugin());
Map<String, String> deltaProperties = ImmutableMap.<String, String>builder()
.put("hive.metastore", "file")
.put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
.put("delta.case-sensitive-partitions-enabled", "false")
.build();
queryRunner.createCatalog(DELTA_CATALOG, "delta", deltaProperties);
// Install a Hive connector catalog that uses the same metastore as Delta
// This catalog will be used to create tables in metastore as the Delta connector doesn't
// support creating tables yet.
queryRunner.installPlugin(new HivePlugin("hive"));
Map<String, String> hiveProperties = ImmutableMap.<String, String>builder()
.put("hive.metastore", "file")
.put("hive.metastore.catalog.dir", catalogDirectory.toFile().toURI().toString())
.put("hive.allow-drop-table", "true")
.put("hive.security", "legacy")
.build();
queryRunner.createCatalog(HIVE_CATALOG, "hive", hiveProperties);
queryRunner.execute(format("CREATE SCHEMA %s.%s", HIVE_CATALOG, DELTA_SCHEMA));
return queryRunner;
}
/**
* Register the given <i>deltaTableName</i> as <i>hiveTableName</i> in HMS using the Delta catalog.
* Hive and Delta catalogs share the same HMS in this test.
*
* @param queryRunner
* @param deltaTableName Name of the delta table which is on the classpath.
* @param hiveTableName Name of the Hive table that the Delta table is to be registered as in HMS
*/
protected static void registerDeltaTableInHMS(QueryRunner queryRunner, String deltaTableName, String hiveTableName)
{
queryRunner.execute(format(
"CREATE TABLE %s.\"%s\".\"%s\" (dummyColumn INT) WITH (external_location = '%s')",
DELTA_CATALOG,
DELTA_SCHEMA,
hiveTableName,
goldenTablePath(deltaTableName)));
}
/**
* Drop the given table from HMS
*/
private static void unregisterDeltaTableInHMS(QueryRunner queryRunner, String hiveTableName)
{
queryRunner.execute(format("DROP TABLE IF EXISTS %s.\"%s\".\"%s\"", DELTA_CATALOG, DELTA_SCHEMA, hiveTableName));
}
}