HiveQueryRunner.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.hive;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.log.Logging;
import com.facebook.presto.Session;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.connector.jmx.JmxPlugin;
import com.facebook.presto.execution.QueryManagerConfig.ExchangeMaterializationStrategy;
import com.facebook.presto.hive.TestHiveEventListenerPlugin.TestingHiveEventListenerPlugin;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.metastore.Database;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.hive.metastore.file.FileHiveMetastore;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.security.Identity;
import com.facebook.presto.spi.security.PrincipalType;
import com.facebook.presto.spi.security.SelectedRole;
import com.facebook.presto.spi.statistics.HistoryBasedPlanStatisticsProvider;
import com.facebook.presto.testing.InMemoryHistoryBasedPlanStatisticsProvider;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.tests.tpcds.TpcdsTableName;
import com.facebook.presto.tpcds.TpcdsPlugin;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.tpch.TpchTable;
import org.joda.time.DateTimeZone;
import java.io.File;
import java.net.URI;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiFunction;
import static com.facebook.airlift.log.Level.ERROR;
import static com.facebook.airlift.log.Level.WARN;
import static com.facebook.presto.SystemSessionProperties.COLOCATED_JOIN;
import static com.facebook.presto.SystemSessionProperties.EXCHANGE_MATERIALIZATION_STRATEGY;
import static com.facebook.presto.SystemSessionProperties.GROUPED_EXECUTION;
import static com.facebook.presto.SystemSessionProperties.HASH_PARTITION_COUNT;
import static com.facebook.presto.SystemSessionProperties.PARTITIONING_PROVIDER_CATALOG;
import static com.facebook.presto.hive.HiveTestUtils.getDataDirectoryPath;
import static com.facebook.presto.spi.security.SelectedRole.Type.ROLE;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static com.facebook.presto.tests.QueryAssertions.copyTables;
import static com.facebook.presto.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static java.util.Locale.ENGLISH;
import static org.testng.Assert.assertEquals;
public final class HiveQueryRunner
{
private static final Logger log = Logger.get(HiveQueryRunner.class);
private HiveQueryRunner()
{
}
public static final String HIVE_CATALOG = "hive";
public static final String HIVE_BUCKETED_CATALOG = "hive_bucketed";
public static final String TPCH_SCHEMA = "tpch";
public static final String TPCH_BUCKETED_SCHEMA = "tpch_bucketed";
public static final String TPCDS_SCHEMA = "tpcds";
public static final String TPCDS_BUCKETED_SCHEMA = "tpcds_bucketed";
public static final MetastoreContext METASTORE_CONTEXT = new MetastoreContext("test_user", "test_queryId", Optional.empty(), Collections.emptySet(), Optional.empty(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER, WarningCollector.NOOP, new RuntimeStats());
private static final String TEMPORARY_TABLE_SCHEMA = "__temporary_tables__";
private static final DateTimeZone TIME_ZONE = DateTimeZone.forID("America/Bahia_Banderas");
public static DistributedQueryRunner createQueryRunner(TpchTable<?>... tables)
throws Exception
{
return createQueryRunner(ImmutableList.copyOf(tables));
}
public static DistributedQueryRunner createQueryRunner(Iterable<TpchTable<?>> tables)
throws Exception
{
return createQueryRunner(tables, ImmutableMap.of(), Optional.empty());
}
public static DistributedQueryRunner createQueryRunner(Iterable<TpchTable<?>> tpchTables, Map<String, String> extraProperties, Map<String, String> tpcdsProperties)
throws Exception
{
return createQueryRunner(tpchTables, ImmutableList.of(), extraProperties, ImmutableMap.of(), "sql-standard", ImmutableMap.of(), Optional.empty(), Optional.empty(), Optional.empty(), tpcdsProperties);
}
public static DistributedQueryRunner createQueryRunner(
Iterable<TpchTable<?>> tpchTables,
Map<String, String> extraProperties,
Map<String, String> extraCoordinatorProperties,
Optional<Path> dataDirectory)
throws Exception
{
return createQueryRunner(tpchTables, ImmutableList.of(), extraProperties, extraCoordinatorProperties, "sql-standard", ImmutableMap.of(), Optional.empty(), dataDirectory, Optional.empty(), ImmutableMap.of());
}
public static DistributedQueryRunner createQueryRunner(Iterable<TpchTable<?>> tpchTables, Map<String, String> extraProperties, Optional<Path> dataDirectory)
throws Exception
{
return createQueryRunner(tpchTables, ImmutableList.of(), extraProperties, ImmutableMap.of(), "sql-standard", ImmutableMap.of(), Optional.empty(), dataDirectory, Optional.empty(), ImmutableMap.of());
}
public static DistributedQueryRunner createQueryRunner(Iterable<TpchTable<?>> tpchTables, List<String> tpcdsTableNames, Map<String, String> extraProperties, Optional<Path> dataDirectory)
throws Exception
{
return createQueryRunner(tpchTables, tpcdsTableNames, extraProperties, ImmutableMap.of(), "sql-standard", ImmutableMap.of(), Optional.empty(), dataDirectory, Optional.empty(), ImmutableMap.of());
}
public static DistributedQueryRunner createQueryRunner(
Iterable<TpchTable<?>> tpchTables,
Map<String, String> extraProperties,
String security,
Map<String, String> extraHiveProperties,
Optional<Path> dataDirectory)
throws Exception
{
return createQueryRunner(tpchTables, ImmutableList.of(), extraProperties, ImmutableMap.of(), security, extraHiveProperties, Optional.empty(), dataDirectory, Optional.empty(), ImmutableMap.of());
}
public static DistributedQueryRunner createQueryRunner(
Iterable<TpchTable<?>> tpchTables,
Map<String, String> extraProperties,
String security,
Map<String, String> extraHiveProperties,
Optional<Path> dataDirectory,
Map<String, String> tpcdsProperties)
throws Exception
{
return createQueryRunner(tpchTables, ImmutableList.of(), extraProperties, ImmutableMap.of(), security, extraHiveProperties, Optional.empty(), dataDirectory, Optional.empty(), tpcdsProperties);
}
public static DistributedQueryRunner createQueryRunner(
Iterable<TpchTable<?>> tpchTables,
Iterable<String> tpcdsTableNames,
Map<String, String> extraProperties,
Map<String, String> extraCoordinatorProperties,
String security,
Map<String, String> extraHiveProperties,
Optional<Integer> workerCount,
Optional<Path> dataDirectory,
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher,
Map<String, String> tpcdsProperties)
throws Exception
{
return createQueryRunner(tpchTables, tpcdsTableNames, extraProperties, extraCoordinatorProperties, security, extraHiveProperties, workerCount, dataDirectory, externalWorkerLauncher, Optional.empty(), tpcdsProperties);
}
public static DistributedQueryRunner createQueryRunner(
Iterable<TpchTable<?>> tpchTables,
Iterable<String> tpcdsTableNames,
Map<String, String> extraProperties,
Map<String, String> extraCoordinatorProperties,
String security,
Map<String, String> extraHiveProperties,
Optional<Integer> workerCount,
Optional<Path> dataDirectory,
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher,
Optional<ExtendedHiveMetastore> externalMetastore,
Map<String, String> tpcdsProperties)
throws Exception
{
return createQueryRunner(
tpchTables,
tpcdsTableNames,
extraProperties,
extraCoordinatorProperties,
security,
extraHiveProperties,
workerCount,
dataDirectory,
externalWorkerLauncher,
externalMetastore,
false,
tpcdsProperties);
}
public static DistributedQueryRunner createQueryRunner(
Iterable<TpchTable<?>> tpchTables,
Iterable<String> tpcdsTableNames,
Map<String, String> extraProperties,
Map<String, String> extraCoordinatorProperties,
String security,
Map<String, String> extraHiveProperties,
Optional<Integer> workerCount,
Optional<Path> dataDirectory,
Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher,
Optional<ExtendedHiveMetastore> externalMetastore,
boolean addJmxPlugin,
Map<String, String> tpcdsProperties)
throws Exception
{
assertEquals(DateTimeZone.getDefault(), TIME_ZONE, "Timezone not configured correctly. Add -Duser.timezone=America/Bahia_Banderas to your JVM arguments");
setupLogging();
Map<String, String> systemProperties = ImmutableMap.<String, String>builder()
.put("task.writer-count", "2")
.put("task.partitioned-writer-count", "4")
.put("tracing.tracer-type", "simple")
.put("tracing.enable-distributed-tracing", "simple")
.putAll(extraProperties)
.build();
DistributedQueryRunner queryRunner =
DistributedQueryRunner.builder(createSession(Optional.of(new SelectedRole(ROLE, Optional.of("admin")))))
.setNodeCount(workerCount.orElse(4))
.setExtraProperties(systemProperties)
.setCoordinatorProperties(extraCoordinatorProperties)
.setDataDirectory(dataDirectory)
.setExternalWorkerLauncher(externalWorkerLauncher)
.build();
try {
queryRunner.installPlugin(new TpchPlugin());
queryRunner.installPlugin(new TpcdsPlugin());
queryRunner.installPlugin(new TestingHiveEventListenerPlugin());
queryRunner.createCatalog("tpch", "tpch");
queryRunner.createCatalog("tpcds", "tpcds", tpcdsProperties);
Map<String, String> tpchProperties = ImmutableMap.<String, String>builder()
.put("tpch.column-naming", "standard")
.build();
queryRunner.createCatalog("tpchstandard", "tpch", tpchProperties);
ExtendedHiveMetastore metastore;
metastore = externalMetastore.orElseGet(() -> getFileHiveMetastore(queryRunner));
queryRunner.installPlugin(new HivePlugin(HIVE_CATALOG, Optional.of(metastore)));
if (addJmxPlugin) {
queryRunner.installPlugin(new JmxPlugin());
queryRunner.createCatalog("jmx", "jmx");
}
Map<String, String> hiveProperties = ImmutableMap.<String, String>builder()
.putAll(extraHiveProperties)
.put("hive.time-zone", TIME_ZONE.getID())
.put("hive.security", security)
.put("hive.max-partitions-per-scan", "1000")
.put("hive.assume-canonical-partition-keys", "true")
.put("hive.collect-column-statistics-on-write", "true")
.put("hive.temporary-table-schema", TEMPORARY_TABLE_SCHEMA)
.build();
Map<String, String> storageProperties = extraHiveProperties.containsKey("hive.storage-format") ?
ImmutableMap.copyOf(hiveProperties) :
ImmutableMap.<String, String>builder()
.putAll(hiveProperties)
.put("hive.storage-format", "TEXTFILE")
.put("hive.compression-codec", "NONE")
.build();
Map<String, String> hiveBucketedProperties = new HashMap<>();
hiveBucketedProperties.putAll(storageProperties);
hiveBucketedProperties.put("hive.max-initial-split-size", "10kB"); // so that each bucket has multiple splits
hiveBucketedProperties.put("hive.max-split-size", "10kB"); // so that each bucket has multiple splits
hiveBucketedProperties = ImmutableMap.copyOf(hiveBucketedProperties);
queryRunner.createCatalog(HIVE_CATALOG, HIVE_CATALOG, hiveProperties);
queryRunner.createCatalog(HIVE_BUCKETED_CATALOG, HIVE_CATALOG, hiveBucketedProperties);
List<String> tpchTableNames = getTpchTableNames(tpchTables);
if (!metastore.getDatabase(METASTORE_CONTEXT, TPCH_SCHEMA).isPresent()) {
metastore.createDatabase(METASTORE_CONTEXT, createDatabaseMetastoreObject(TPCH_SCHEMA));
copyTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createSession(Optional.empty()), tpchTableNames, true, false);
}
if (!metastore.getDatabase(METASTORE_CONTEXT, TPCH_BUCKETED_SCHEMA).isPresent()) {
metastore.createDatabase(METASTORE_CONTEXT, createDatabaseMetastoreObject(TPCH_BUCKETED_SCHEMA));
copyTables(queryRunner, "tpch", TINY_SCHEMA_NAME, createBucketedSession(Optional.empty()), tpchTableNames, true, true);
}
if (!metastore.getDatabase(METASTORE_CONTEXT, TEMPORARY_TABLE_SCHEMA).isPresent()) {
metastore.createDatabase(METASTORE_CONTEXT, createDatabaseMetastoreObject(TEMPORARY_TABLE_SCHEMA));
}
if (!metastore.getDatabase(METASTORE_CONTEXT, TPCDS_SCHEMA).isPresent()) {
metastore.createDatabase(METASTORE_CONTEXT, createDatabaseMetastoreObject(TPCDS_SCHEMA));
copyTables(queryRunner, "tpcds", TINY_SCHEMA_NAME, createSession(Optional.empty(), TPCDS_SCHEMA), tpcdsTableNames, true, false);
}
if (!metastore.getDatabase(METASTORE_CONTEXT, TPCDS_BUCKETED_SCHEMA).isPresent()) {
metastore.createDatabase(METASTORE_CONTEXT, createDatabaseMetastoreObject(TPCDS_BUCKETED_SCHEMA));
copyTables(queryRunner, "tpcds", TINY_SCHEMA_NAME, createBucketedSession(Optional.empty(), TPCDS_BUCKETED_SCHEMA), tpcdsTableNames, true, true);
}
return queryRunner;
}
catch (Exception e) {
queryRunner.close();
throw e;
}
}
private static List<String> getTpchTableNames(Iterable<TpchTable<?>> tables)
{
ImmutableList.Builder<String> tableNames = ImmutableList.builder();
tables.forEach(table -> tableNames.add(table.getTableName().toLowerCase(ENGLISH)));
return tableNames.build();
}
public static List<String> getAllTpcdsTableNames()
{
ImmutableList.Builder<String> tables = ImmutableList.builder();
for (TpcdsTableName tpcdsTable : TpcdsTableName.getBaseTables()) {
tables.add(tpcdsTable.getTableName().toLowerCase(ENGLISH));
}
return tables.build();
}
public static ExtendedHiveMetastore getFileHiveMetastore(DistributedQueryRunner queryRunner)
{
File dataDirectory = queryRunner.getCoordinator().getDataDirectory().resolve("hive_data").toFile();
HiveClientConfig hiveClientConfig = new HiveClientConfig();
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig), ImmutableSet.of(), hiveClientConfig);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
return new FileHiveMetastore(hdfsEnvironment, dataDirectory.toURI().toString(), "test");
}
public static DistributedQueryRunner createMaterializingQueryRunner(Iterable<TpchTable<?>> tables)
throws Exception
{
return createQueryRunner(
tables,
ImmutableMap.of(
"query.partitioning-provider-catalog", "hive",
"query.exchange-materialization-strategy", "ALL",
"query.hash-partition-count", "11",
"colocated-joins-enabled", "true",
"grouped-execution-enabled", "true"),
"sql-standard",
ImmutableMap.of("hive.create-empty-bucket-files-for-temporary-table", "false"),
Optional.empty(),
ImmutableMap.of());
}
public static DistributedQueryRunner createMaterializingAndSpillingQueryRunner(Iterable<TpchTable<?>> tables)
throws Exception
{
return createQueryRunner(
tables,
ImmutableMap.<String, String>builder()
.put("query.partitioning-provider-catalog", "hive")
.put("query.exchange-materialization-strategy", "ALL")
.put("query.hash-partition-count", "11")
.put("colocated-joins-enabled", "true")
.put("grouped-execution-enabled", "true")
.put("experimental.spill-enabled", "true")
.put("experimental.spiller-spill-path", Paths.get(System.getProperty("java.io.tmpdir"), "presto", "spills").toString())
.put("experimental.spiller-max-used-space-threshold", "1.0")
.put("experimental.memory-revoking-threshold", "0.0") // revoke always
.put("experimental.memory-revoking-target", "0.0")
.build(),
Optional.empty());
}
private static void setupLogging()
{
Logging logging = Logging.initialize();
logging.setLevel("com.facebook.presto.event", WARN);
logging.setLevel("com.facebook.presto.security.AccessControlManager", WARN);
logging.setLevel("com.facebook.presto.server.PluginManager", WARN);
logging.setLevel("com.facebook.airlift.bootstrap.LifeCycleManager", WARN);
logging.setLevel("org.apache.parquet.hadoop", WARN);
logging.setLevel("org.eclipse.jetty.server.handler.ContextHandler", WARN);
logging.setLevel("org.eclipse.jetty.server.AbstractConnector", WARN);
logging.setLevel("org.glassfish.jersey.internal.inject.Providers", ERROR);
logging.setLevel("parquet.hadoop", WARN);
}
public static Database createDatabaseMetastoreObject(String name)
{
return Database.builder()
.setDatabaseName(name)
.setOwnerName("public")
.setOwnerType(PrincipalType.ROLE)
.build();
}
public static Session createSession(Optional<SelectedRole> role)
{
return createSession(role, TPCH_SCHEMA);
}
public static Session createSession(Optional<SelectedRole> role, String schema)
{
return testSessionBuilder()
.setIdentity(new Identity(
"hive",
Optional.empty(),
role.map(selectedRole -> ImmutableMap.of(HIVE_CATALOG, selectedRole))
.orElse(ImmutableMap.of()),
ImmutableMap.of(),
ImmutableMap.of(),
Optional.empty(),
Optional.empty()))
.setCatalog(HIVE_CATALOG)
.setSchema(schema)
.build();
}
public static Session createBucketedSession(Optional<SelectedRole> role)
{
return createBucketedSession(role, TPCH_BUCKETED_SCHEMA);
}
public static Session createBucketedSession(Optional<SelectedRole> role, String schema)
{
return testSessionBuilder()
.setIdentity(new Identity(
"hive",
Optional.empty(),
role.map(selectedRole -> ImmutableMap.of(HIVE_BUCKETED_CATALOG, selectedRole))
.orElse(ImmutableMap.of()),
ImmutableMap.of(),
ImmutableMap.of(),
Optional.empty(),
Optional.empty()))
.setCatalog(HIVE_BUCKETED_CATALOG)
.setSchema(schema)
.build();
}
public static Session createMaterializeExchangesSession(Optional<SelectedRole> role)
{
return testSessionBuilder()
.setIdentity(new Identity(
"hive",
Optional.empty(),
role.map(selectedRole -> ImmutableMap.of("hive", selectedRole))
.orElse(ImmutableMap.of()),
ImmutableMap.of(),
ImmutableMap.of(),
Optional.empty(),
Optional.empty()))
.setSystemProperty(PARTITIONING_PROVIDER_CATALOG, HIVE_CATALOG)
.setSystemProperty(EXCHANGE_MATERIALIZATION_STRATEGY, ExchangeMaterializationStrategy.ALL.name())
.setSystemProperty(HASH_PARTITION_COUNT, "13")
.setSystemProperty(COLOCATED_JOIN, "true")
.setSystemProperty(GROUPED_EXECUTION, "true")
.setCatalog(HIVE_CATALOG)
.setSchema(TPCH_SCHEMA)
.build();
}
public static void main(String[] args)
throws Exception
{
// You need to add "--user user" to your CLI for your queries to work
setupLogging();
Optional<Path> dataDirectory;
if (args.length > 0) {
if (args.length != 1) {
log.error("usage: HiveQueryRunner [dataDirectory]\n");
log.error(" [dataDirectory] is a local directory under which you want the hive_data directory to be created.]\n");
System.exit(1);
}
dataDirectory = getDataDirectoryPath(Optional.of(args[0]));
}
else {
dataDirectory = getDataDirectoryPath(Optional.empty());
}
DistributedQueryRunner queryRunner = createQueryRunner(TpchTable.getTables(), getAllTpcdsTableNames(), ImmutableMap.of("http-server.http.port", "8080"), dataDirectory);
try {
queryRunner.installPlugin(new Plugin()
{
@Override
public Iterable<HistoryBasedPlanStatisticsProvider> getHistoryBasedPlanStatisticsProviders()
{
return ImmutableList.of(new InMemoryHistoryBasedPlanStatisticsProvider());
}
});
}
catch (Exception e) {
queryRunner.close();
throw e;
}
Thread.sleep(10);
Logger log = Logger.get(DistributedQueryRunner.class);
log.info("======== SERVER STARTED ========");
log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
}
}