PrestoSparkNativeQueryRunnerUtils.java
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spark;
import com.facebook.airlift.log.Logging;
import com.facebook.presto.hive.metastore.Database;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils;
import com.facebook.presto.spark.execution.nativeprocess.NativeExecutionModule;
import com.facebook.presto.spark.execution.property.NativeExecutionConnectorConfig;
import com.facebook.presto.spi.security.PrincipalType;
import com.facebook.presto.testing.QueryRunner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Module;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Map;
import java.util.Optional;
import static com.facebook.airlift.log.Level.WARN;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerHiveProperties;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerSystemProperties;
import static com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils.getNativeQueryRunnerParameters;
import static com.facebook.presto.spark.PrestoSparkQueryRunner.METASTORE_CONTEXT;
/**
* Following JVM argument is needed to run Spark native tests.
* <p>
* - PRESTO_SERVER
* - This tells Spark where to find the Presto native binary to launch the process.
* Example: -DPRESTO_SERVER=/path/to/native/process/bin
* <p>
* - DATA_DIR
* - Optional path to store TPC-H tables used in the test. If this directory is empty, it will be
* populated. If tables already exists, they will be reused.
* <p>
* Tests can be running in Interactive Debugging Mode that allows for easier debugging
* experience. Instead of launching its own native process, the test will connect to an existing
* native process. This gives developers flexibility to connect IDEA and debuggers to the native process.
* Enable this mode by setting NATIVE_PORT JVM argument.
* <p>
* - NATIVE_PORT
* - This is the port your externally launched native process listens to. It is used to tell Spark where to send
* requests. This port number has to be the same as to which your externally launched process listens.
* Example: -DNATIVE_PORT=7777.
* When NATIVE_PORT is specified, PRESTO_SERVER argument is not requires and is ignored if specified.
* <p>
* For test queries requiring shuffle, the disk-based local shuffle will be used.
*/
public class PrestoSparkNativeQueryRunnerUtils
{
private static final int AVAILABLE_CPU_COUNT = 4;
private static final String SPARK_SHUFFLE_MANAGER = "spark.shuffle.manager";
private static final String FALLBACK_SPARK_SHUFFLE_MANAGER = "spark.fallback.shuffle.manager";
private static final String DEFAULT_STORAGE_FORMAT = "DWRF";
private static Optional<Path> dataDirectory = Optional.empty();
private PrestoSparkNativeQueryRunnerUtils() {}
public static Map<String, String> getNativeExecutionSparkConfigs()
{
ImmutableMap.Builder<String, String> builder = new ImmutableMap.Builder<String, String>()
// Do not use default Prestissimo config files. Presto-Spark will generate the configs on-the-fly.
.put("catalog.config-dir", "/")
.put("task.info-update-interval", "100ms")
.put("spark.initial-partition-count", "1")
.put("register-test-functions", "true")
.put("native-execution-program-arguments", "--logtostderr=1 --minloglevel=3")
.put("spark.partition-count-auto-tune-enabled", "false");
if (System.getProperty("NATIVE_PORT") == null) {
builder.put("native-execution-executable-path", getNativeQueryRunnerParameters().serverBinary.toString());
}
try {
builder.put("native-execution-broadcast-base-path",
Files.createTempDirectory("native_broadcast").toAbsolutePath().toString());
}
catch (IOException e) {
throw new UncheckedIOException("Error creating temporary directory for broadcast", e);
}
return builder.build();
}
public static PrestoSparkQueryRunner createHiveRunner()
{
PrestoSparkQueryRunner queryRunner = createRunner("hive", new NativeExecutionModule());
PrestoNativeQueryRunnerUtils.setupJsonFunctionNamespaceManager(queryRunner, "external_functions.json", "json");
return queryRunner;
}
private static PrestoSparkQueryRunner createRunner(String defaultCatalog, NativeExecutionModule nativeExecutionModule)
{
// Increases log level to reduce log spamming while running test.
customizeLogging();
return createRunner(
defaultCatalog,
Optional.of(getBaseDataPath()),
getNativeExecutionSparkConfigs(),
getNativeExecutionShuffleConfigs(),
ImmutableList.of(nativeExecutionModule));
}
// Similar to createPrestoSparkNativeQueryRunner, but with custom connector config and without jsonFunctionNamespaceManager
public static PrestoSparkQueryRunner createTpchRunner()
{
return createRunner(
"tpchstandard",
new NativeExecutionModule(
Optional.of(new NativeExecutionConnectorConfig().setConnectorName("tpch"))));
}
public static PrestoSparkQueryRunner createRunner(String defaultCatalog, Optional<Path> baseDir, Map<String, String> additionalConfigProperties, Map<String, String> additionalSparkProperties, ImmutableList<Module> nativeModules)
{
ImmutableMap.Builder<String, String> configBuilder = ImmutableMap.builder();
configBuilder.putAll(getNativeWorkerSystemProperties()).putAll(additionalConfigProperties);
Optional<Path> dataDir = baseDir.map(path -> Paths.get(path.toString() + '/' + DEFAULT_STORAGE_FORMAT));
PrestoSparkQueryRunner queryRunner = new PrestoSparkQueryRunner(
defaultCatalog,
configBuilder.build(),
getNativeWorkerHiveProperties(),
additionalSparkProperties,
dataDir,
nativeModules,
AVAILABLE_CPU_COUNT);
ExtendedHiveMetastore metastore = queryRunner.getMetastore();
if (!metastore.getDatabase(METASTORE_CONTEXT, "tpch").isPresent()) {
metastore.createDatabase(METASTORE_CONTEXT, createDatabaseMetastoreObject("tpch"));
}
return queryRunner;
}
public static QueryRunner createJavaQueryRunner()
throws Exception
{
return PrestoNativeQueryRunnerUtils.javaHiveQueryRunnerBuilder()
.setAddStorageFormatToPath(true)
.setStorageFormat(DEFAULT_STORAGE_FORMAT)
.build();
}
public static void customizeLogging()
{
Logging logging = Logging.initialize();
logging.setLevel("org.apache.spark", WARN);
logging.setLevel("com.facebook.presto.spark", WARN);
}
public static synchronized Path getBaseDataPath()
{
if (dataDirectory.isPresent()) {
return dataDirectory.get();
}
dataDirectory = Optional.of(getNativeQueryRunnerParameters().dataDirectory);
return dataDirectory.get();
}
private static Database createDatabaseMetastoreObject(String name)
{
return Database.builder()
.setDatabaseName(name)
.setOwnerName("public")
.setOwnerType(PrincipalType.ROLE)
.build();
}
private static Map<String, String> getNativeExecutionShuffleConfigs()
{
ImmutableMap.Builder<String, String> sparkConfigs = ImmutableMap.builder();
sparkConfigs.put(SPARK_SHUFFLE_MANAGER, "com.facebook.presto.spark.classloader_interface.PrestoSparkNativeExecutionShuffleManager");
sparkConfigs.put(FALLBACK_SPARK_SHUFFLE_MANAGER, "org.apache.spark.shuffle.sort.SortShuffleManager");
return sparkConfigs.build();
}
}