IcebergQueryRunner.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.iceberg;
import com.facebook.airlift.log.Logger;
import com.facebook.airlift.log.Logging;
import com.facebook.presto.Session;
import com.facebook.presto.common.RuntimeStats;
import com.facebook.presto.connector.jmx.JmxPlugin;
import com.facebook.presto.hive.HdfsConfiguration;
import com.facebook.presto.hive.HdfsConfigurationInitializer;
import com.facebook.presto.hive.HdfsEnvironment;
import com.facebook.presto.hive.HiveClientConfig;
import com.facebook.presto.hive.HiveColumnConverterProvider;
import com.facebook.presto.hive.HiveHdfsConfiguration;
import com.facebook.presto.hive.MetastoreClientConfig;
import com.facebook.presto.hive.authentication.NoHdfsAuthentication;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.MetastoreContext;
import com.facebook.presto.iceberg.hive.IcebergFileHiveMetastore;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.facebook.presto.tpcds.TpcdsPlugin;
import com.facebook.presto.tpch.TpchPlugin;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import io.airlift.tpch.TpchTable;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiFunction;
import static com.facebook.airlift.log.Level.ERROR;
import static com.facebook.airlift.log.Level.WARN;
import static com.facebook.presto.hive.HiveTestUtils.getDataDirectoryPath;
import static com.facebook.presto.iceberg.CatalogType.HIVE;
import static com.facebook.presto.iceberg.FileFormat.PARQUET;
import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
import static com.facebook.presto.tests.QueryAssertions.copyTpchTables;
import static com.facebook.presto.tpch.TpchMetadata.TINY_SCHEMA_NAME;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;
public final class IcebergQueryRunner
{
private static final Logger log = Logger.get(IcebergQueryRunner.class);
public static final String ICEBERG_CATALOG = "iceberg";
public static final String TEST_DATA_DIRECTORY = "iceberg_data";
public static final MetastoreContext METASTORE_CONTEXT = new MetastoreContext("test_user", "test_queryId", Optional.empty(), Collections.emptySet(), Optional.empty(), Optional.empty(), false, HiveColumnConverterProvider.DEFAULT_COLUMN_CONVERTER_PROVIDER, WarningCollector.NOOP, new RuntimeStats());
private DistributedQueryRunner queryRunner;
private Map<String, Map<String, String>> icebergCatalogs;
private IcebergQueryRunner(DistributedQueryRunner queryRunner, Map<String, Map<String, String>> icebergCatalogs)
{
this.queryRunner = requireNonNull(queryRunner, "queryRunner is null");
this.icebergCatalogs = new ConcurrentHashMap<>(requireNonNull(icebergCatalogs, "icebergCatalogs is null"));
}
public DistributedQueryRunner getQueryRunner()
{
return queryRunner;
}
public Map<String, Map<String, String>> getIcebergCatalogs()
{
return icebergCatalogs;
}
public void addCatalog(String name, Map<String, String> properties)
{
queryRunner.createCatalog(name, "iceberg", properties);
icebergCatalogs.put(name, properties);
}
public static Builder builder()
{
return new Builder();
}
public static class Builder
{
private Builder() {}
private CatalogType catalogType = HIVE;
private Map<String, Map<String, String>> icebergCatalogs = new HashMap<>();
private Map<String, String> extraProperties = new HashMap<>();
private Map<String, String> extraConnectorProperties = new HashMap<>();
private Map<String, String> tpcdsProperties = new HashMap<>();
private FileFormat format = PARQUET;
private boolean createTpchTables = true;
private boolean addJmxPlugin = true;
private OptionalInt nodeCount = OptionalInt.of(4);
private Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher = Optional.empty();
private Optional<Path> dataDirectory = Optional.empty();
private boolean addStorageFormatToPath;
private Optional<String> schemaName = Optional.empty();
public Builder setFormat(FileFormat format)
{
this.format = format;
return this;
}
public Builder setExternalWorkerLauncher(Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher)
{
this.externalWorkerLauncher = externalWorkerLauncher;
return this;
}
public Builder setSchemaName(String schemaName)
{
this.schemaName = Optional.of(schemaName);
return this;
}
public Builder setCreateTpchTables(boolean createTpchTables)
{
this.createTpchTables = createTpchTables;
return this;
}
public Builder setAddJmxPlugin(boolean addJmxPlugin)
{
this.addJmxPlugin = addJmxPlugin;
return this;
}
public Builder setNodeCount(OptionalInt nodeCount)
{
this.nodeCount = nodeCount;
return this;
}
public Builder setExtraProperties(Map<String, String> extraProperties)
{
this.extraProperties = extraProperties;
return this;
}
public Builder setCatalogType(CatalogType catalogType)
{
this.catalogType = catalogType;
return this;
}
public Builder setDataDirectory(Optional<Path> dataDirectory)
{
this.dataDirectory = dataDirectory;
return this;
}
public Builder setExtraConnectorProperties(Map<String, String> extraConnectorProperties)
{
this.extraConnectorProperties = extraConnectorProperties;
return this;
}
public Builder setAddStorageFormatToPath(boolean addStorageFormatToPath)
{
this.addStorageFormatToPath = addStorageFormatToPath;
return this;
}
public Builder setTpcdsProperties(Map<String, String> tpcdsProperties)
{
this.tpcdsProperties = tpcdsProperties;
return this;
}
public IcebergQueryRunner build()
throws Exception
{
setupLogging();
checkArgument(!extraConnectorProperties.containsKey("iceberg.catalog.type"), "extraConnectorProperties cannot contain iceberg.catalog.type");
checkArgument(!extraConnectorProperties.containsKey("iceberg.file-format"), "extraConnectorProperties cannot contain iceberg.file-format");
ImmutableMap.Builder<String, Map<String, String>> icebergCatalogs = ImmutableMap.builder();
Session session = testSessionBuilder()
.setCatalog(ICEBERG_CATALOG)
.setSchema(schemaName.orElse("tpch"))
.build();
DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session)
.setExtraProperties(extraProperties)
.setDataDirectory(dataDirectory)
.setNodeCount(nodeCount.orElse(4))
.setExternalWorkerLauncher(externalWorkerLauncher)
.build();
queryRunner.installPlugin(new TpchPlugin());
queryRunner.createCatalog("tpch", "tpch");
queryRunner.installPlugin(new TpcdsPlugin());
queryRunner.createCatalog("tpcds", "tpcds", tpcdsProperties);
queryRunner.getServers().forEach(server -> {
MBeanServer mBeanServer = MBeanServerFactory.newMBeanServer();
server.installPlugin(new IcebergPlugin(mBeanServer));
if (addJmxPlugin) {
server.installPlugin(new JmxPlugin(mBeanServer));
}
});
Path icebergDataDirectory = getIcebergDataDirectoryPath(queryRunner.getCoordinator().getDataDirectory(), catalogType.name(), format, addStorageFormatToPath);
Map<String, String> icebergProperties = new HashMap<>();
icebergProperties.put("iceberg.file-format", format.name());
icebergProperties.put("iceberg.catalog.type", catalogType.name());
icebergProperties.putAll(getConnectorProperties(catalogType, icebergDataDirectory));
icebergProperties.putAll(extraConnectorProperties);
queryRunner.createCatalog(ICEBERG_CATALOG, "iceberg", ImmutableMap.copyOf(icebergProperties));
icebergCatalogs.put(ICEBERG_CATALOG, ImmutableMap.copyOf(icebergProperties));
if (addJmxPlugin) {
queryRunner.createCatalog("jmx", "jmx");
}
if (catalogType == HIVE) {
ExtendedHiveMetastore metastore = getFileHiveMetastore(icebergDataDirectory);
if (!metastore.getDatabase(METASTORE_CONTEXT, "tpch").isPresent()) {
queryRunner.execute("CREATE SCHEMA tpch");
}
if (!metastore.getDatabase(METASTORE_CONTEXT, "tpcds").isPresent()) {
queryRunner.execute("CREATE SCHEMA tpcds");
}
}
else {
queryRunner.execute("CREATE SCHEMA tpch");
queryRunner.execute("CREATE SCHEMA tpcds");
}
if (createTpchTables) {
copyTpchTables(queryRunner, "tpch", TINY_SCHEMA_NAME, session, TpchTable.getTables(), true);
}
return new IcebergQueryRunner(queryRunner, icebergCatalogs.build());
}
}
private static ExtendedHiveMetastore getFileHiveMetastore(Path dataDirectory)
{
HiveClientConfig hiveClientConfig = new HiveClientConfig();
MetastoreClientConfig metastoreClientConfig = new MetastoreClientConfig();
HdfsConfiguration hdfsConfiguration = new HiveHdfsConfiguration(new HdfsConfigurationInitializer(hiveClientConfig, metastoreClientConfig), ImmutableSet.of(), hiveClientConfig);
HdfsEnvironment hdfsEnvironment = new HdfsEnvironment(hdfsConfiguration, metastoreClientConfig, new NoHdfsAuthentication());
return new IcebergFileHiveMetastore(hdfsEnvironment, dataDirectory.toFile().toURI().toString(), "test");
}
public static Path getIcebergDataDirectoryPath(Path dataDirectory, String catalogType, FileFormat format, boolean addStorageFormatToPath)
{
Path icebergDataDirectory = addStorageFormatToPath ? dataDirectory.resolve(TEST_DATA_DIRECTORY).resolve(format.name())
: dataDirectory.resolve(TEST_DATA_DIRECTORY);
Path icebergCatalogDirectory = icebergDataDirectory.resolve(catalogType);
return icebergCatalogDirectory;
}
public static Map<String, String> getConnectorProperties(CatalogType icebergCatalogType, Path icebergDataDirectory)
{
switch (icebergCatalogType) {
case HADOOP:
case REST:
case NESSIE:
try {
if (!Files.exists(icebergDataDirectory)) {
Files.createDirectories(icebergDataDirectory);
}
}
catch (IOException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "cannot create Iceberg catalog directory " + icebergDataDirectory, e);
}
return ImmutableMap.of("iceberg.catalog.warehouse", icebergDataDirectory.toFile().toURI().toString());
case HIVE:
return ImmutableMap.of(
"hive.metastore", "file",
"hive.metastore.catalog.dir", icebergDataDirectory.toFile().toURI().toString());
}
throw new PrestoException(NOT_SUPPORTED, "Unsupported Presto Iceberg catalog type " + icebergCatalogType);
}
private static void setupLogging()
{
Logging logging = Logging.initialize();
logging.setLevel("com.facebook.presto.event", WARN);
logging.setLevel("com.facebook.presto.security.AccessControlManager", WARN);
logging.setLevel("com.facebook.presto.server.PluginManager", WARN);
logging.setLevel("com.facebook.airlift.bootstrap.LifeCycleManager", WARN);
logging.setLevel("org.apache.parquet.hadoop", WARN);
logging.setLevel("org.eclipse.jetty.server.handler.ContextHandler", WARN);
logging.setLevel("org.eclipse.jetty.server.AbstractConnector", WARN);
logging.setLevel("org.glassfish.jersey.internal.inject.Providers", ERROR);
logging.setLevel("parquet.hadoop", WARN);
logging.setLevel("org.apache.iceberg", WARN);
logging.setLevel("com.facebook.airlift.bootstrap", WARN);
logging.setLevel("Bootstrap", WARN);
logging.setLevel("org.apache.hadoop.io.compress", WARN);
}
public static void main(String[] args)
throws Exception
{
setupLogging();
Optional<Path> dataDirectory;
if (args.length > 0) {
if (args.length != 1) {
log.error("usage: IcebergQueryRunner [dataDirectory]\n");
log.error(" [dataDirectory] is a local directory under which you want the iceberg_data directory to be created.]\n");
System.exit(1);
}
dataDirectory = getDataDirectoryPath(Optional.of(args[0]));
}
else {
dataDirectory = getDataDirectoryPath(Optional.empty());
}
Map<String, String> properties = ImmutableMap.of("http-server.http.port", "8080");
DistributedQueryRunner queryRunner = null;
try {
queryRunner = builder()
.setExtraProperties(properties)
.setDataDirectory(dataDirectory)
.build()
.getQueryRunner();
}
catch (Throwable t) {
log.error(t);
System.exit(1);
}
Thread.sleep(10);
Logger log = Logger.get(IcebergQueryRunner.class);
log.info("======== SERVER STARTED ========");
log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
}
}