PrestoNativeQueryRunnerUtils.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.nativeworker;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.common.ErrorCode;
import com.facebook.presto.functionNamespace.FunctionNamespaceManagerPlugin;
import com.facebook.presto.functionNamespace.json.JsonFileBasedFunctionNamespaceManagerFactory;
import com.facebook.presto.hive.HiveQueryRunner;
import com.facebook.presto.hive.metastore.Column;
import com.facebook.presto.hive.metastore.ExtendedHiveMetastore;
import com.facebook.presto.hive.metastore.PrincipalPrivileges;
import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.hive.metastore.StorageFormat;
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.iceberg.FileFormat;
import com.facebook.presto.iceberg.IcebergQueryRunner;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.DistributedQueryRunner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.io.Resources;
import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
import org.apache.hadoop.hive.ql.io.SymlinkTextInputFormat;
import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe;

import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.UUID;
import java.util.function.BiFunction;

import static com.facebook.presto.common.ErrorType.INTERNAL_ERROR;
import static com.facebook.presto.hive.HiveQueryRunner.METASTORE_CONTEXT;
import static com.facebook.presto.hive.HiveQueryRunner.createDatabaseMetastoreObject;
import static com.facebook.presto.hive.HiveQueryRunner.getFileHiveMetastore;
import static com.facebook.presto.hive.HiveTestUtils.getProperty;
import static com.facebook.presto.hive.metastore.PrestoTableType.EXTERNAL_TABLE;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeSidecarProperties;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerHiveProperties;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerSystemProperties;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerTpcdsProperties;
import static com.facebook.presto.nativeworker.SymlinkManifestGeneratorUtils.createSymlinkManifest;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;
import static org.testng.Assert.assertTrue;

public class PrestoNativeQueryRunnerUtils
{
    public enum QueryRunnerType {
        JAVA,
        NATIVE
    }

    // The unix domain socket (UDS) used to communicate with the remote function server.
    public static final String REMOTE_FUNCTION_UDS = "remote_function_server.socket";
    public static final String REMOTE_FUNCTION_JSON_SIGNATURES = "remote_function_server.json";
    public static final String REMOTE_FUNCTION_CATALOG_NAME = "remote";
    public static final String HIVE_DATA = "hive_data";

    protected static final String ICEBERG_DEFAULT_STORAGE_FORMAT = "PARQUET";

    private static final Logger log = Logger.get(PrestoNativeQueryRunnerUtils.class);
    private static final String DEFAULT_STORAGE_FORMAT = "DWRF";
    private static final String SYMLINK_FOLDER = "symlink_tables_manifests";
    private static final PrincipalPrivileges PRINCIPAL_PRIVILEGES = new PrincipalPrivileges(ImmutableMultimap.of(), ImmutableMultimap.of());
    private static final ErrorCode CREATE_ERROR_CODE = new ErrorCode(123, "CREATE_ERROR_CODE", INTERNAL_ERROR);

    private static final StorageFormat STORAGE_FORMAT_SYMLINK_TABLE = StorageFormat.create(
            ParquetHiveSerDe.class.getName(),
            SymlinkTextInputFormat.class.getName(),
            HiveIgnoreKeyTextOutputFormat.class.getName());

    private PrestoNativeQueryRunnerUtils() {}

    public static HiveQueryRunnerBuilder nativeHiveQueryRunnerBuilder()
    {
        return new HiveQueryRunnerBuilder(QueryRunnerType.NATIVE);
    }

    public static HiveQueryRunnerBuilder javaHiveQueryRunnerBuilder()
    {
        return new HiveQueryRunnerBuilder(QueryRunnerType.JAVA);
    }

    public static class HiveQueryRunnerBuilder
    {
        private NativeQueryRunnerParameters nativeQueryRunnerParameters = getNativeQueryRunnerParameters();
        private Path dataDirectory = nativeQueryRunnerParameters.dataDirectory;
        private String serverBinary = nativeQueryRunnerParameters.serverBinary.toString();
        private Integer workerCount = nativeQueryRunnerParameters.workerCount.orElse(4);
        private Integer cacheMaxSize = 0;
        private String storageFormat = DEFAULT_STORAGE_FORMAT;
        private Optional<String> remoteFunctionServerUds = Optional.empty();
        private Map<String, String> extraProperties = new HashMap<>();
        private Map<String, String> extraCoordinatorProperties = new HashMap<>();
        private Map<String, String> hiveProperties = new HashMap<>();
        private Map<String, String> tpcdsProperties = new HashMap<>();
        private String security;
        private boolean addStorageFormatToPath;
        private boolean coordinatorSidecarEnabled;
        private boolean enableRuntimeMetricsCollection;
        private boolean enableSsdCache;
        private boolean failOnNestedLoopJoin;
        // External worker launcher is applicable only for the native hive query runner, since it depends on other
        // properties it should be created once all the other query runner configs are set. This variable indicates
        // whether the query runner returned by builder should use an external worker launcher, it will be true only
        // for the native query runner and should NOT be explicitly configured by users.
        private boolean useExternalWorkerLauncher;

        private HiveQueryRunnerBuilder(QueryRunnerType queryRunnerType)
        {
            this.hiveProperties.putAll(ImmutableMap.<String, String>builder()
                    .put("hive.storage-format", this.storageFormat)
                    .put("hive.pushdown-filter-enabled", "true")
                    .build());
            this.tpcdsProperties.putAll(getNativeWorkerTpcdsProperties());

            if (queryRunnerType.equals(QueryRunnerType.NATIVE)) {
                this.extraProperties.putAll(getNativeWorkerSystemProperties());
                this.extraCoordinatorProperties.putAll(ImmutableMap.<String, String>builder()
                        .put("native-execution-enabled", "true")
                        .put("http-server.http.port", "8081")
                        .build());
                // The property "hive.allow-drop-table" needs to be set to true because security is always "legacy" in NativeQueryRunner.
                this.hiveProperties.putAll(ImmutableMap.<String, String>builder()
                        .putAll(getNativeWorkerHiveProperties())
                        .put("hive.allow-drop-table", "true")
                        .build());
                this.security = "legacy";
                this.useExternalWorkerLauncher = true;
            }
            else {
                this.extraProperties.putAll(ImmutableMap.of(
                        "regex-library", "RE2J",
                        "offset-clause-enabled", "true"));
                this.security = "sql-standard";
                this.useExternalWorkerLauncher = false;
            }
        }

        public HiveQueryRunnerBuilder setRemoteFunctionServerUds(Optional<String> remoteFunctionServerUds)
        {
            this.remoteFunctionServerUds = remoteFunctionServerUds;
            return this;
        }

        public HiveQueryRunnerBuilder setFailOnNestedLoopJoin(boolean failOnNestedLoopJoin)
        {
            this.failOnNestedLoopJoin = failOnNestedLoopJoin;
            return this;
        }

        public HiveQueryRunnerBuilder setUseThrift(boolean useThrift)
        {
            this.extraProperties
                    .put("experimental.internal-communication.thrift-transport-enabled", String.valueOf(useThrift));
            return this;
        }

        public HiveQueryRunnerBuilder setSingleNodeExecutionEnabled(boolean singleNodeExecutionEnabled)
        {
            if (singleNodeExecutionEnabled) {
                this.extraCoordinatorProperties.put("single-node-execution-enabled", "true");
            }
            return this;
        }

        public HiveQueryRunnerBuilder setEnableSsdCache(boolean enableSsdCache)
        {
            this.enableSsdCache = enableSsdCache;
            return this;
        }

        public HiveQueryRunnerBuilder setEnableRuntimeMetricsCollection(boolean enableRuntimeMetricsCollection)
        {
            this.enableRuntimeMetricsCollection = enableRuntimeMetricsCollection;
            return this;
        }

        public HiveQueryRunnerBuilder setCoordinatorSidecarEnabled(boolean coordinatorSidecarEnabled)
        {
            this.coordinatorSidecarEnabled = coordinatorSidecarEnabled;
            if (coordinatorSidecarEnabled) {
                this.extraProperties.putAll(getNativeSidecarProperties());
            }
            return this;
        }

        public HiveQueryRunnerBuilder setStorageFormat(String storageFormat)
        {
            this.storageFormat = storageFormat;
            this.hiveProperties.put("hive.storage-format", storageFormat);
            return this;
        }

        public HiveQueryRunnerBuilder setAddStorageFormatToPath(boolean addStorageFormatToPath)
        {
            this.addStorageFormatToPath = addStorageFormatToPath;
            return this;
        }

        public HiveQueryRunnerBuilder setCacheMaxSize(Integer cacheMaxSize)
        {
            this.cacheMaxSize = cacheMaxSize;
            return this;
        }

        public HiveQueryRunnerBuilder setExtraProperties(Map<String, String> extraProperties)
        {
            this.extraProperties.putAll(extraProperties);
            return this;
        }

        public HiveQueryRunnerBuilder setExtraCoordinatorProperties(Map<String, String> extraCoordinatorProperties)
        {
            this.extraCoordinatorProperties.putAll(extraCoordinatorProperties);
            return this;
        }

        public HiveQueryRunnerBuilder setHiveProperties(Map<String, String> hiveProperties)
        {
            this.hiveProperties.putAll(hiveProperties);
            return this;
        }

        public QueryRunner build()
                throws Exception
        {
            Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher = Optional.empty();
            if (this.useExternalWorkerLauncher) {
                externalWorkerLauncher = getExternalWorkerLauncher("hive", serverBinary, cacheMaxSize, remoteFunctionServerUds,
                        failOnNestedLoopJoin, coordinatorSidecarEnabled, enableRuntimeMetricsCollection, enableSsdCache);
            }
            return HiveQueryRunner.createQueryRunner(
                    ImmutableList.of(),
                    ImmutableList.of(),
                    extraProperties,
                    extraCoordinatorProperties,
                    security,
                    hiveProperties,
                    Optional.ofNullable(workerCount),
                    Optional.of(Paths.get(addStorageFormatToPath ? dataDirectory.toString() + "/" + storageFormat : dataDirectory.toString())),
                    externalWorkerLauncher,
                    tpcdsProperties);
        }
    }

    public static IcebergQueryRunnerBuilder nativeIcebergQueryRunnerBuilder()
    {
        return new IcebergQueryRunnerBuilder(QueryRunnerType.NATIVE);
    }

    public static IcebergQueryRunnerBuilder javaIcebergQueryRunnerBuilder()
    {
        return new IcebergQueryRunnerBuilder(QueryRunnerType.JAVA);
    }

    public static class IcebergQueryRunnerBuilder
    {
        private NativeQueryRunnerParameters nativeQueryRunnerParameters = getNativeQueryRunnerParameters();
        private Path dataDirectory = nativeQueryRunnerParameters.dataDirectory;
        private String serverBinary = nativeQueryRunnerParameters.serverBinary.toString();
        private Integer workerCount = nativeQueryRunnerParameters.workerCount.orElse(4);
        private Integer cacheMaxSize = 0;
        private String storageFormat = ICEBERG_DEFAULT_STORAGE_FORMAT;
        private Map<String, String> extraProperties = new HashMap<>();
        private Map<String, String> extraConnectorProperties = new HashMap<>();
        private Optional<String> remoteFunctionServerUds = Optional.empty();
        private boolean addStorageFormatToPath;
        // External worker launcher is applicable only for the native iceberg query runner, since it depends on other
        // properties it should be created once all the other query runner configs are set. This variable indicates
        // whether the query runner returned by builder should use an external worker launcher, it will be true only
        // for the native query runner and should NOT be explicitly configured by users.
        private boolean useExternalWorkerLauncher;

        private IcebergQueryRunnerBuilder(QueryRunnerType queryRunnerType)
        {
            if (queryRunnerType.equals(QueryRunnerType.NATIVE)) {
                this.extraProperties.putAll(ImmutableMap.<String, String>builder()
                        .put("http-server.http.port", "8080")
                        .put("query.max-stage-count", "110")
                        .putAll(getNativeWorkerSystemProperties())
                        .build());
                this.useExternalWorkerLauncher = true;
            }
            else {
                this.extraProperties.putAll(ImmutableMap.of(
                        "regex-library", "RE2J",
                        "offset-clause-enabled", "true",
                        "query.max-stage-count", "110"));
                this.extraConnectorProperties.putAll(ImmutableMap.of("hive.parquet.writer.version", "PARQUET_1_0"));
                this.useExternalWorkerLauncher = false;
            }
        }

        public IcebergQueryRunnerBuilder setStorageFormat(String storageFormat)
        {
            this.storageFormat = storageFormat;
            return this;
        }

        public IcebergQueryRunnerBuilder setAddStorageFormatToPath(boolean addStorageFormatToPath)
        {
            this.addStorageFormatToPath = addStorageFormatToPath;
            return this;
        }

        public IcebergQueryRunnerBuilder setUseThrift(boolean useThrift)
        {
            this.extraProperties
                    .put("experimental.internal-communication.thrift-transport-enabled", String.valueOf(useThrift));
            return this;
        }

        public QueryRunner build()
                throws Exception
        {
            Optional<BiFunction<Integer, URI, Process>> externalWorkerLauncher = Optional.empty();
            if (this.useExternalWorkerLauncher) {
                externalWorkerLauncher = getExternalWorkerLauncher("iceberg", serverBinary, cacheMaxSize, remoteFunctionServerUds,
                        false, false, false, false);
            }
            return IcebergQueryRunner.builder()
                    .setExtraProperties(extraProperties)
                    .setExtraConnectorProperties(extraConnectorProperties)
                    .setFormat(FileFormat.valueOf(storageFormat))
                    .setCreateTpchTables(false)
                    .setAddJmxPlugin(false)
                    .setNodeCount(OptionalInt.of(workerCount))
                    .setExternalWorkerLauncher(externalWorkerLauncher)
                    .setAddStorageFormatToPath(addStorageFormatToPath)
                    .setDataDirectory(Optional.of(dataDirectory))
                    .setTpcdsProperties(getNativeWorkerTpcdsProperties())
                    .build().getQueryRunner();
        }
    }

    public static void createSchemaIfNotExist(QueryRunner queryRunner, String schemaName)
    {
        ExtendedHiveMetastore metastore = getFileHiveMetastore((DistributedQueryRunner) queryRunner);
        if (!metastore.getDatabase(METASTORE_CONTEXT, schemaName).isPresent()) {
            metastore.createDatabase(METASTORE_CONTEXT, createDatabaseMetastoreObject(schemaName));
        }
    }

    public static void createExternalTable(QueryRunner queryRunner, String sourceSchemaName, String tableName, List<Column> columns, String targetSchemaName)
    {
        ExtendedHiveMetastore metastore = getFileHiveMetastore((DistributedQueryRunner) queryRunner);
        File dataDirectory = ((DistributedQueryRunner) queryRunner).getCoordinator().getDataDirectory().resolve(HIVE_DATA).toFile();
        Path hiveTableDataPath = dataDirectory.toPath().resolve(sourceSchemaName).resolve(tableName);
        Path symlinkTableDataPath = dataDirectory.toPath().getParent().resolve(SYMLINK_FOLDER).resolve(tableName);

        try {
            createSymlinkManifest(hiveTableDataPath, symlinkTableDataPath);
        }
        catch (IOException e) {
            throw new PrestoException(() -> CREATE_ERROR_CODE, "Failed to create symlink manifest file for table: " + tableName, e);
        }

        createSchemaIfNotExist(queryRunner, targetSchemaName);
        if (!metastore.getTable(METASTORE_CONTEXT, targetSchemaName, tableName).isPresent()) {
            metastore.createTable(METASTORE_CONTEXT, createHiveSymlinkTable(targetSchemaName, tableName, columns, symlinkTableDataPath.toString()), PRINCIPAL_PRIVILEGES, emptyList());
        }
    }

    // Start the remote function server. Return the UDS path used to communicate with it.
    public static String startRemoteFunctionServer(String remoteFunctionServerBinaryPath)
    {
        try {
            Path tempDirectoryPath = Files.createTempDirectory("RemoteFunctionServer");
            Path remoteFunctionServerUdsPath = tempDirectoryPath.resolve(REMOTE_FUNCTION_UDS);
            log.info("Temp directory for Remote Function Server: %s", tempDirectoryPath.toString());

            Process p = new ProcessBuilder(Paths.get(remoteFunctionServerBinaryPath).toAbsolutePath().toString(), "--uds_path", remoteFunctionServerUdsPath.toString(), "--function_prefix", REMOTE_FUNCTION_CATALOG_NAME + ".schema.")
                    .directory(tempDirectoryPath.toFile())
                    .redirectErrorStream(true)
                    .redirectOutput(ProcessBuilder.Redirect.to(tempDirectoryPath.resolve("thrift_server.out").toFile()))
                    .redirectError(ProcessBuilder.Redirect.to(tempDirectoryPath.resolve("thrift_server.err").toFile()))
                    .start();
            return remoteFunctionServerUdsPath.toString();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    public static NativeQueryRunnerParameters getNativeQueryRunnerParameters()
    {
        Path prestoServerPath = Paths.get(getProperty("PRESTO_SERVER")
                        .orElse("_build/debug/presto_cpp/main/presto_server"))
                .toAbsolutePath();
        Path dataDirectory = Paths.get(getProperty("DATA_DIR")
                        .orElse("target/velox_data"))
                .toAbsolutePath();
        Optional<Integer> workerCount = getProperty("WORKER_COUNT").map(Integer::parseInt);

        assertTrue(Files.exists(prestoServerPath), format("Native worker binary at %s not found. Add -DPRESTO_SERVER=<path/to/presto_server> to your JVM arguments.", prestoServerPath));
        log.info("Using PRESTO_SERVER binary at %s", prestoServerPath);

        if (!Files.exists(dataDirectory)) {
            assertTrue(dataDirectory.toFile().mkdirs());
        }

        assertTrue(Files.exists(dataDirectory), format("Data directory at %s is missing. Add -DDATA_DIR=<path/to/data> to your JVM arguments to specify the path", dataDirectory));
        log.info("using DATA_DIR at %s", dataDirectory);

        return new NativeQueryRunnerParameters(prestoServerPath, dataDirectory, workerCount);
    }

    public static Optional<BiFunction<Integer, URI, Process>> getExternalWorkerLauncher(
            String catalogName,
            String prestoServerPath,
            int cacheMaxSize,
            Optional<String> remoteFunctionServerUds,
            Boolean failOnNestedLoopJoin,
            boolean isCoordinatorSidecarEnabled,
            boolean enableRuntimeMetricsCollection,
            boolean enableSsdCache)
    {
        return
                Optional.of((workerIndex, discoveryUri) -> {
                    try {
                        Path dir = Paths.get("/tmp", PrestoNativeQueryRunnerUtils.class.getSimpleName());
                        Files.createDirectories(dir);
                        Path tempDirectoryPath = Files.createTempDirectory(dir, "worker");
                        log.info("Temp directory for Worker #%d: %s", workerIndex, tempDirectoryPath.toString());

                        // Write config file - use an ephemeral port for the worker.
                        String configProperties = format("discovery.uri=%s%n" +
                                "presto.version=testversion%n" +
                                "system-memory-gb=4%n" +
                                "http-server.http.port=0%n", discoveryUri);

                        if (isCoordinatorSidecarEnabled) {
                            configProperties = format("%s%n" +
                                    "native-sidecar=true%n" +
                                    "presto.default-namespace=native.default%n", configProperties);
                        }

                        if (enableRuntimeMetricsCollection) {
                            configProperties = format("%s%n" +
                                    "runtime-metrics-collection-enabled=true%n", configProperties);
                        }

                        if (enableSsdCache) {
                            Path ssdCacheDir = Paths.get(tempDirectoryPath + "/velox-ssd-cache");
                            Files.createDirectories(ssdCacheDir);
                            configProperties = format("%s%n" +
                                    "async-cache-ssd-gb=1%n" +
                                    "async-cache-ssd-path=%s/%n", configProperties, ssdCacheDir);
                        }

                        if (remoteFunctionServerUds.isPresent()) {
                            String jsonSignaturesPath = Resources.getResource(REMOTE_FUNCTION_JSON_SIGNATURES).getFile();
                            configProperties = format("%s%n" +
                                    "remote-function-server.catalog-name=%s%n" +
                                    "remote-function-server.thrift.uds-path=%s%n" +
                                    "remote-function-server.serde=presto_page%n" +
                                    "remote-function-server.signature.files.directory.path=%s%n", configProperties, REMOTE_FUNCTION_CATALOG_NAME, remoteFunctionServerUds.get(), jsonSignaturesPath);
                        }

                        if (failOnNestedLoopJoin) {
                            configProperties = format("%s%n" + "velox-plan-validator-fail-on-nested-loop-join=true%n", configProperties);
                        }

                        Files.write(tempDirectoryPath.resolve("config.properties"), configProperties.getBytes());
                        Files.write(tempDirectoryPath.resolve("node.properties"),
                                format("node.id=%s%n" +
                                        "node.internal-address=127.0.0.1%n" +
                                        "node.environment=testing%n" +
                                        "node.location=test-location", UUID.randomUUID()).getBytes());

                        Path catalogDirectoryPath = tempDirectoryPath.resolve("catalog");
                        Files.createDirectory(catalogDirectoryPath);
                        if (cacheMaxSize > 0) {
                            Files.write(catalogDirectoryPath.resolve(format("%s.properties", catalogName)),
                                    format("connector.name=hive%n" +
                                            "cache.enabled=true%n" +
                                            "cache.max-cache-size=%s", cacheMaxSize).getBytes());
                        }
                        else {
                            Files.write(catalogDirectoryPath.resolve(format("%s.properties", catalogName)),
                                    "connector.name=hive".getBytes());
                        }
                        // Add catalog with caching always enabled.
                        Files.write(catalogDirectoryPath.resolve(format("%scached.properties", catalogName)),
                                format("connector.name=hive%n" +
                                        "cache.enabled=true%n" +
                                        "cache.max-cache-size=32").getBytes());

                        // Add a tpch catalog.
                        Files.write(catalogDirectoryPath.resolve("tpchstandard.properties"),
                                format("connector.name=tpch%n").getBytes());

                        // Disable stack trace capturing as some queries (using TRY) generate a lot of exceptions.
                        return new ProcessBuilder(prestoServerPath, "--logtostderr=1", "--v=1", "--velox_ssd_odirect=false")
                                .directory(tempDirectoryPath.toFile())
                                .redirectErrorStream(true)
                                .redirectOutput(ProcessBuilder.Redirect.to(tempDirectoryPath.resolve("worker." + workerIndex + ".out").toFile()))
                                .redirectError(ProcessBuilder.Redirect.to(tempDirectoryPath.resolve("worker." + workerIndex + ".out").toFile()))
                                .start();
                    }
                    catch (IOException e) {
                        throw new UncheckedIOException(e);
                    }
                });
    }

    public static class NativeQueryRunnerParameters
    {
        public final Path serverBinary;
        public final Path dataDirectory;
        public final Optional<Integer> workerCount;

        public NativeQueryRunnerParameters(Path serverBinary, Path dataDirectory, Optional<Integer> workerCount)
        {
            this.serverBinary = requireNonNull(serverBinary, "serverBinary is null");
            this.dataDirectory = requireNonNull(dataDirectory, "dataDirectory is null");
            this.workerCount = requireNonNull(workerCount, "workerCount is null");
        }
    }

    public static void setupJsonFunctionNamespaceManager(QueryRunner queryRunner, String jsonFileName, String catalogName)
    {
        String jsonDefinitionPath = Resources.getResource(jsonFileName).getFile();
        queryRunner.installPlugin(new FunctionNamespaceManagerPlugin());
        queryRunner.loadFunctionNamespaceManager(
                JsonFileBasedFunctionNamespaceManagerFactory.NAME,
                catalogName,
                ImmutableMap.of(
                        "supported-function-languages", "CPP",
                        "function-implementation-type", "CPP",
                        "json-based-function-manager.path-to-function-definition", jsonDefinitionPath));
    }

    private static Table createHiveSymlinkTable(String databaseName, String tableName, List<Column> columns, String location)
    {
        return new Table(
                Optional.of("catalogName"),
                databaseName,
                tableName,
                "hive",
                EXTERNAL_TABLE,
                new Storage(STORAGE_FORMAT_SYMLINK_TABLE,
                        "file:" + location,
                        Optional.empty(),
                        false,
                        ImmutableMap.of(),
                        ImmutableMap.of()),
                columns,
                ImmutableList.of(),
                ImmutableMap.of(),
                Optional.empty(),
                Optional.empty());
    }
}