SessionPropertyManager.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.metadata;

import com.facebook.airlift.json.JsonCodec;
import com.facebook.airlift.json.JsonCodecFactory;
import com.facebook.airlift.log.Logger;
import com.facebook.presto.Session;
import com.facebook.presto.SystemSessionProperties;
import com.facebook.presto.common.block.BlockBuilder;
import com.facebook.presto.common.type.ArrayType;
import com.facebook.presto.common.type.BigintType;
import com.facebook.presto.common.type.BooleanType;
import com.facebook.presto.common.type.DoubleType;
import com.facebook.presto.common.type.IntegerType;
import com.facebook.presto.common.type.MapType;
import com.facebook.presto.common.type.TinyintType;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.VarcharType;
import com.facebook.presto.sessionpropertyproviders.JavaWorkerSessionPropertyProvider;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.session.PropertyMetadata;
import com.facebook.presto.spi.session.SessionPropertyContext;
import com.facebook.presto.spi.session.WorkerSessionPropertyProvider;
import com.facebook.presto.spi.session.WorkerSessionPropertyProviderFactory;
import com.facebook.presto.spiller.NodeSpillConfig;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.JavaFeaturesConfig;
import com.facebook.presto.sql.planner.ParameterRewriter;
import com.facebook.presto.sql.tree.Expression;
import com.facebook.presto.sql.tree.ExpressionTreeRewriter;
import com.facebook.presto.sql.tree.NodeRef;
import com.facebook.presto.sql.tree.Parameter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;

import javax.annotation.Nullable;
import javax.inject.Inject;

import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

import static com.facebook.presto.common.type.TypeUtils.writeNativeValue;
import static com.facebook.presto.spi.StandardErrorCode.INVALID_SESSION_PROPERTY;
import static com.facebook.presto.sql.planner.ExpressionInterpreter.evaluateConstantExpression;
import static com.facebook.presto.util.PropertiesUtil.loadProperties;
import static com.google.common.base.MoreObjects.firstNonNull;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.io.Files.getNameWithoutExtension;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.TimeUnit.HOURS;

public final class SessionPropertyManager
{
    private static final JsonCodecFactory JSON_CODEC_FACTORY = new JsonCodecFactory();
    private static final Logger log = Logger.get(SessionPropertyManager.class);
    private static final String SESSION_PROPERTY_PROVIDER_NAME = "session-property-provider.name";

    private final ConcurrentMap<String, PropertyMetadata<?>> systemSessionProperties = new ConcurrentHashMap<>();
    private final ConcurrentMap<ConnectorId, Map<String, PropertyMetadata<?>>> connectorSessionProperties = new ConcurrentHashMap<>();
    private final Map<String, WorkerSessionPropertyProvider> workerSessionPropertyProviders;
    private final Map<String, WorkerSessionPropertyProviderFactory> workerSessionPropertyProviderFactories = new ConcurrentHashMap<>();
    private final Supplier<Map<String, PropertyMetadata<?>>> memoizedWorkerSessionProperties;
    private final Optional<NodeManager> nodeManager;
    private final Optional<TypeManager> functionAndTypeManager;
    private final File configDir;
    private final AtomicBoolean sessionPropertyProvidersLoading = new AtomicBoolean();

    @Inject
    public SessionPropertyManager(
            SystemSessionProperties systemSessionProperties,
            Map<String, WorkerSessionPropertyProvider> workerSessionPropertyProviders,
            FunctionAndTypeManager functionAndTypeManager,
            NodeManager nodeManager,
            SessionPropertyProviderConfig config)
    {
        this(systemSessionProperties.getSessionProperties(), workerSessionPropertyProviders, Optional.ofNullable(functionAndTypeManager), Optional.ofNullable(nodeManager), config);
    }

    public SessionPropertyManager(
            List<PropertyMetadata<?>> sessionProperties,
            Map<String, WorkerSessionPropertyProvider> workerSessionPropertyProviders,
            Optional<TypeManager> functionAndTypeManager,
            Optional<NodeManager> nodeManager,
            SessionPropertyProviderConfig config)
    {
        this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
        this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null");
        this.memoizedWorkerSessionProperties = Suppliers.memoizeWithExpiration(this::getWorkerSessionProperties,
                1, HOURS);
        this.workerSessionPropertyProviders = new ConcurrentHashMap<>(workerSessionPropertyProviders);
        this.configDir = requireNonNull(config, "config is null").getSessionPropertyProvidersConfigurationDir();
        addSystemSessionProperties(sessionProperties);
    }

    public static SessionPropertyManager createTestingSessionPropertyManager()
    {
        return createTestingSessionPropertyManager(new SystemSessionProperties().getSessionProperties(), new JavaFeaturesConfig(), new NodeSpillConfig());
    }

    public static SessionPropertyManager createTestingSessionPropertyManager(SystemSessionProperties systemSessionProperties)
    {
        return createTestingSessionPropertyManager(systemSessionProperties.getSessionProperties(), new JavaFeaturesConfig(), new NodeSpillConfig());
    }

    public static SessionPropertyManager createTestingSessionPropertyManager(List<PropertyMetadata<?>> sessionProperties)
    {
        return createTestingSessionPropertyManager(sessionProperties, new JavaFeaturesConfig(), new NodeSpillConfig());
    }

    public static SessionPropertyManager createTestingSessionPropertyManager(
            List<PropertyMetadata<?>> sessionProperties,
            JavaFeaturesConfig javaFeaturesConfig,
            NodeSpillConfig nodeSpillConfig)
    {
        return new SessionPropertyManager(
                sessionProperties,
                ImmutableMap.of(
                        "java-worker",
                        new JavaWorkerSessionPropertyProvider(
                                new FeaturesConfig(),
                                javaFeaturesConfig,
                                nodeSpillConfig)),
                Optional.empty(),
                Optional.empty(),
                new SessionPropertyProviderConfig());
    }

    public void loadSessionPropertyProviders()
            throws Exception
    {
        if (!sessionPropertyProvidersLoading.compareAndSet(false, true)) {
            return;
        }

        for (File file : listFiles(configDir)) {
            if (file.isFile() && file.getName().endsWith(".properties")) {
                String sessionPropertyProviderName = getNameWithoutExtension(file.getName());
                Map<String, String> properties = loadProperties(file);
                checkState(!isNullOrEmpty(properties.get(SESSION_PROPERTY_PROVIDER_NAME)),
                        "Session property manager configuration %s does not contain %s",
                        file.getAbsoluteFile(),
                        SESSION_PROPERTY_PROVIDER_NAME);
                properties = new HashMap<>(properties);
                properties.remove(SESSION_PROPERTY_PROVIDER_NAME);
                loadSessionPropertyProvider(sessionPropertyProviderName, properties, functionAndTypeManager, nodeManager);
            }
        }
    }

    public void loadSessionPropertyProvider(String sessionPropertyProviderName, Map<String, String> properties, Optional<TypeManager> typeManager, Optional<NodeManager> nodeManager)
    {
        log.info("-- Loading %s session property provider --", sessionPropertyProviderName);
        WorkerSessionPropertyProviderFactory factory = workerSessionPropertyProviderFactories.get(sessionPropertyProviderName);
        checkState(factory != null, "No factory for session property provider : " + sessionPropertyProviderName);
        WorkerSessionPropertyProvider sessionPropertyProvider = factory.create(new SessionPropertyContext(typeManager, nodeManager), properties);
        if (workerSessionPropertyProviders.putIfAbsent(sessionPropertyProviderName, sessionPropertyProvider) != null) {
            throw new IllegalArgumentException("System session property provider is already registered for property provider : " + sessionPropertyProviderName);
        }
        log.info("-- Added session property provider [%s] --", sessionPropertyProviderName);
    }

    @VisibleForTesting
    public Map<String, WorkerSessionPropertyProvider> getWorkerSessionPropertyProviders()
    {
        return ImmutableMap.copyOf(workerSessionPropertyProviders);
    }

    public void addSessionPropertyProviderFactory(WorkerSessionPropertyProviderFactory factory)
    {
        if (workerSessionPropertyProviderFactories.putIfAbsent(factory.getName(), factory) != null) {
            throw new IllegalArgumentException(format("System Session property provider factory" + factory.getName() + "is already registered"));
        }
    }

    public void addSystemSessionProperties(List<PropertyMetadata<?>> systemSessionProperties)
    {
        systemSessionProperties
                .forEach(this::addSystemSessionProperty);
    }

    public void addSystemSessionProperty(PropertyMetadata<?> sessionProperty)
    {
        requireNonNull(sessionProperty, "sessionProperty is null");
        checkState(systemSessionProperties.put(sessionProperty.getName(), sessionProperty) == null,
                "System session property '%s' are already registered", sessionProperty.getName());
    }

    public void addConnectorSessionProperties(ConnectorId connectorId, List<PropertyMetadata<?>> properties)
    {
        requireNonNull(connectorId, "connectorId is null");
        requireNonNull(properties, "properties is null");

        Map<String, PropertyMetadata<?>> propertiesByName = Maps.uniqueIndex(properties, PropertyMetadata::getName);
        checkState(connectorSessionProperties.putIfAbsent(connectorId, propertiesByName) == null, "Session properties for connectorId '%s' are already registered", connectorId);
    }

    public void removeConnectorSessionProperties(ConnectorId connectorId)
    {
        connectorSessionProperties.remove(connectorId);
    }

    public Optional<PropertyMetadata<?>> getSystemSessionPropertyMetadata(String name)
    {
        requireNonNull(name, "name is null");
        if (systemSessionProperties.get(name) == null) {
            return Optional.ofNullable(memoizedWorkerSessionProperties.get().get(name));
        }
        return Optional.ofNullable(systemSessionProperties.get(name));
    }

    public Optional<PropertyMetadata<?>> getConnectorSessionPropertyMetadata(ConnectorId connectorId, String propertyName)
    {
        requireNonNull(connectorId, "connectorId is null");
        requireNonNull(propertyName, "propertyName is null");
        Map<String, PropertyMetadata<?>> properties = connectorSessionProperties.get(connectorId);
        if (properties == null || properties.isEmpty()) {
            throw new PrestoException(INVALID_SESSION_PROPERTY, "Unknown connector " + connectorId);
        }

        return Optional.ofNullable(properties.get(propertyName));
    }

    private Map<String, PropertyMetadata<?>> getWorkerSessionProperties()
    {
        List<PropertyMetadata<?>> workerSessionPropertiesList = workerSessionPropertyProviders.values().stream()
                .flatMap(manager -> manager.getSessionProperties().stream())
                .collect(toImmutableList());
        Map<String, PropertyMetadata<?>> workerSessionProperties = new ConcurrentHashMap<>();
        workerSessionPropertiesList.forEach(sessionProperty -> {
            requireNonNull(sessionProperty, "sessionProperty is null");
            // TODO: Implement fail fast in case of duplicate entries.
            workerSessionProperties.put(sessionProperty.getName(), sessionProperty);
        });
        return workerSessionProperties;
    }

    private static List<File> listFiles(File dir)
    {
        if (dir != null && dir.isDirectory()) {
            File[] files = dir.listFiles();
            if (files != null) {
                return ImmutableList.copyOf(files);
            }
        }
        return ImmutableList.of();
    }

    public List<SessionPropertyValue> getAllSessionProperties(Session session, Map<String, ConnectorId> catalogs)
    {
        requireNonNull(session, "session is null");

        ImmutableList.Builder<SessionPropertyValue> sessionPropertyValues = ImmutableList.builder();
        Map<String, String> systemProperties = session.getSystemProperties();
        for (PropertyMetadata<?> property : new TreeMap<>(systemSessionProperties).values()) {
            String defaultValue = firstNonNull(property.getDefaultValue(), "").toString();
            String value = systemProperties.getOrDefault(property.getName(), defaultValue);
            sessionPropertyValues.add(new SessionPropertyValue(
                    value,
                    defaultValue,
                    property.getName(),
                    Optional.empty(),
                    property.getName(),
                    property.getDescription(),
                    property.getSqlType().getDisplayName(),
                    property.isHidden()));
        }

        for (Entry<String, ConnectorId> entry : new TreeMap<>(catalogs).entrySet()) {
            String catalog = entry.getKey();
            ConnectorId connectorId = entry.getValue();
            Map<String, String> connectorProperties = session.getConnectorProperties(connectorId);

            for (PropertyMetadata<?> property : new TreeMap<>(connectorSessionProperties.get(connectorId)).values()) {
                String defaultValue = firstNonNull(property.getDefaultValue(), "").toString();
                String value = connectorProperties.getOrDefault(property.getName(), defaultValue);

                sessionPropertyValues.add(new SessionPropertyValue(
                        value,
                        defaultValue,
                        catalog + "." + property.getName(),
                        Optional.of(catalog),
                        property.getName(),
                        property.getDescription(),
                        property.getSqlType().getDisplayName(),
                        property.isHidden()));
            }
        }

        for (PropertyMetadata<?> property : new TreeMap<>(memoizedWorkerSessionProperties.get()).values()) {
            String defaultValue = firstNonNull(property.getDefaultValue(), "").toString();
            String value = systemProperties.getOrDefault(property.getName(), defaultValue);
            sessionPropertyValues.add(new SessionPropertyValue(
                    value,
                    defaultValue,
                    property.getName(),
                    Optional.empty(),
                    property.getName(),
                    property.getDescription(),
                    property.getSqlType().getDisplayName(),
                    property.isHidden()));
        }
        return sessionPropertyValues.build();
    }

    public <T> T decodeSystemPropertyValue(String name, @Nullable String value, Class<T> type)
    {
        PropertyMetadata<?> property = getSystemSessionPropertyMetadata(name)
                .orElseThrow(() -> new PrestoException(INVALID_SESSION_PROPERTY, "Unknown session property " + name));

        return decodePropertyValue(name, value, type, property);
    }

    public <T> T decodeCatalogPropertyValue(ConnectorId connectorId, String catalogName, String propertyName, @Nullable String propertyValue, Class<T> type)
    {
        String fullPropertyName = catalogName + "." + propertyName;
        PropertyMetadata<?> property = getConnectorSessionPropertyMetadata(connectorId, propertyName)
                .orElseThrow(() -> new PrestoException(INVALID_SESSION_PROPERTY, "Unknown session property " + fullPropertyName));

        return decodePropertyValue(fullPropertyName, propertyValue, type, property);
    }

    public void validateSystemSessionProperty(String propertyName, String propertyValue)
    {
        PropertyMetadata<?> propertyMetadata = getSystemSessionPropertyMetadata(propertyName)
                .orElseThrow(() -> new PrestoException(INVALID_SESSION_PROPERTY, "Unknown session property " + propertyName));

        decodePropertyValue(propertyName, propertyValue, propertyMetadata.getJavaType(), propertyMetadata);
    }

    public void validateCatalogSessionProperty(ConnectorId connectorId, String catalogName, String propertyName, String propertyValue)
    {
        String fullPropertyName = catalogName + "." + propertyName;
        PropertyMetadata<?> propertyMetadata = getConnectorSessionPropertyMetadata(connectorId, propertyName)
                .orElseThrow(() -> new PrestoException(INVALID_SESSION_PROPERTY, "Unknown session property " + fullPropertyName));

        decodePropertyValue(fullPropertyName, propertyValue, propertyMetadata.getJavaType(), propertyMetadata);
    }

    private static <T> T decodePropertyValue(String fullPropertyName, @Nullable String propertyValue, Class<T> type, PropertyMetadata<?> metadata)
    {
        if (metadata.getJavaType() != type) {
            throw new PrestoException(INVALID_SESSION_PROPERTY, format("Property %s is type %s, but requested type was %s", fullPropertyName,
                    metadata.getJavaType().getName(),
                    type.getName()));
        }
        if (propertyValue == null) {
            return type.cast(metadata.getDefaultValue());
        }

        try {
            Object objectValue = deserializeSessionProperty(metadata.getSqlType(), propertyValue);
            return type.cast(metadata.decode(objectValue));
        }
        catch (PrestoException e) {
            throw e;
        }
        catch (Exception e) {
            // the system property decoder can throw any exception
            throw new PrestoException(INVALID_SESSION_PROPERTY, format("%s is invalid: %s", fullPropertyName, propertyValue), e);
        }
    }

    public static Object evaluatePropertyValue(Expression expression, Type expectedType, Session session, Metadata metadata, Map<NodeRef<Parameter>, Expression> parameters)
    {
        Expression rewritten = ExpressionTreeRewriter.rewriteWith(new ParameterRewriter(parameters), expression);
        Object value = evaluateConstantExpression(rewritten, expectedType, metadata, session, parameters);

        // convert to object value type of SQL type
        BlockBuilder blockBuilder = expectedType.createBlockBuilder(null, 1);
        writeNativeValue(expectedType, blockBuilder, value);
        Object objectValue = expectedType.getObjectValue(session.getSqlFunctionProperties(), blockBuilder, 0);

        if (objectValue == null) {
            throw new PrestoException(INVALID_SESSION_PROPERTY, "Session property value must not be null");
        }
        return objectValue;
    }

    public static String serializeSessionProperty(Type type, Object value)
    {
        if (value == null) {
            throw new PrestoException(INVALID_SESSION_PROPERTY, "Session property can not be null");
        }
        if (BooleanType.BOOLEAN.equals(type)) {
            return value.toString();
        }
        if (BigintType.BIGINT.equals(type)) {
            return value.toString();
        }
        if (IntegerType.INTEGER.equals(type)) {
            return value.toString();
        }
        if (DoubleType.DOUBLE.equals(type)) {
            return value.toString();
        }
        if (VarcharType.VARCHAR.equals(type)) {
            return value.toString();
        }
        if (TinyintType.TINYINT.equals(type)) {
            return value.toString();
        }
        if (type instanceof ArrayType || type instanceof MapType) {
            return getJsonCodecForType(type).toJson(value);
        }
        throw new PrestoException(INVALID_SESSION_PROPERTY, format("Session property type %s is not supported", type));
    }

    private static Object deserializeSessionProperty(Type type, String value)
    {
        if (value == null) {
            throw new PrestoException(INVALID_SESSION_PROPERTY, "Session property can not be null");
        }
        if (VarcharType.VARCHAR.equals(type)) {
            return value;
        }
        if (BooleanType.BOOLEAN.equals(type)) {
            return Boolean.valueOf(value);
        }
        if (BigintType.BIGINT.equals(type)) {
            return Long.valueOf(value);
        }
        if (IntegerType.INTEGER.equals(type)) {
            return Integer.valueOf(value);
        }
        if (DoubleType.DOUBLE.equals(type)) {
            return Double.valueOf(value);
        }
        if (TinyintType.TINYINT.equals(type)) {
            return Byte.valueOf(value);
        }
        if (type instanceof ArrayType || type instanceof MapType) {
            return getJsonCodecForType(type).fromJson(value);
        }
        throw new PrestoException(INVALID_SESSION_PROPERTY, format("Session property type %s is not supported", type));
    }

    private static <T> JsonCodec<T> getJsonCodecForType(Type type)
    {
        if (VarcharType.VARCHAR.equals(type)) {
            return (JsonCodec<T>) JSON_CODEC_FACTORY.jsonCodec(String.class);
        }
        if (BooleanType.BOOLEAN.equals(type)) {
            return (JsonCodec<T>) JSON_CODEC_FACTORY.jsonCodec(Boolean.class);
        }
        if (BigintType.BIGINT.equals(type)) {
            return (JsonCodec<T>) JSON_CODEC_FACTORY.jsonCodec(Long.class);
        }
        if (IntegerType.INTEGER.equals(type)) {
            return (JsonCodec<T>) JSON_CODEC_FACTORY.jsonCodec(Integer.class);
        }
        if (DoubleType.DOUBLE.equals(type)) {
            return (JsonCodec<T>) JSON_CODEC_FACTORY.jsonCodec(Double.class);
        }
        if (TinyintType.TINYINT.equals(type)) {
            return (JsonCodec<T>) JSON_CODEC_FACTORY.jsonCodec(Byte.class);
        }
        if (type instanceof ArrayType) {
            Type elementType = ((ArrayType) type).getElementType();
            return (JsonCodec<T>) JSON_CODEC_FACTORY.listJsonCodec(getJsonCodecForType(elementType));
        }
        if (type instanceof MapType) {
            Type keyType = ((MapType) type).getKeyType();
            Type valueType = ((MapType) type).getValueType();
            return (JsonCodec<T>) JSON_CODEC_FACTORY.mapJsonCodec(getMapKeyType(keyType), getJsonCodecForType(valueType));
        }
        throw new PrestoException(INVALID_SESSION_PROPERTY, format("Session property type %s is not supported", type));
    }

    private static Class<?> getMapKeyType(Type type)
    {
        if (VarcharType.VARCHAR.equals(type)) {
            return String.class;
        }
        if (BooleanType.BOOLEAN.equals(type)) {
            return Boolean.class;
        }
        if (BigintType.BIGINT.equals(type)) {
            return Long.class;
        }
        if (IntegerType.INTEGER.equals(type)) {
            return Integer.class;
        }
        if (DoubleType.DOUBLE.equals(type)) {
            return Double.class;
        }
        if (TinyintType.TINYINT.equals(type)) {
            return Byte.class;
        }
        throw new PrestoException(INVALID_SESSION_PROPERTY, format("Session property map key type %s is not supported", type));
    }

    public static class SessionPropertyValue
    {
        private final String fullyQualifiedName;
        private final Optional<String> catalogName;
        private final String propertyName;
        private final String description;
        private final String type;
        private final String value;
        private final String defaultValue;
        private final boolean hidden;

        private SessionPropertyValue(String value,
                String defaultValue,
                String fullyQualifiedName,
                Optional<String> catalogName,
                String propertyName,
                String description,
                String type,
                boolean hidden)
        {
            this.fullyQualifiedName = fullyQualifiedName;
            this.catalogName = catalogName;
            this.propertyName = propertyName;
            this.description = description;
            this.type = type;
            this.value = value;
            this.defaultValue = defaultValue;
            this.hidden = hidden;
        }

        public String getFullyQualifiedName()
        {
            return fullyQualifiedName;
        }

        public Optional<String> getCatalogName()
        {
            return catalogName;
        }

        public String getPropertyName()
        {
            return propertyName;
        }

        public String getDescription()
        {
            return description;
        }

        public String getType()
        {
            return type;
        }

        public String getValue()
        {
            return value;
        }

        public String getDefaultValue()
        {
            return defaultValue;
        }

        public boolean isHidden()
        {
            return hidden;
        }
    }
}