QueryPrerequisitesManager.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.dispatcher;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.prerequisites.QueryPrerequisites;
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesContext;
import com.facebook.presto.spi.prerequisites.QueryPrerequisitesFactory;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.presto.util.PropertiesUtil.loadProperties;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.lang.String.format;
import static java.util.Objects.requireNonNull;

public class QueryPrerequisitesManager
        implements QueryPrerequisites
{
    private static final Logger log = Logger.get(QueryPrerequisitesManager.class);

    private static final File QUERY_PREREQUISITES_CONFIG = new File("etc/query-prerequisites.properties");
    private static final String QUERY_PREREQUISITES_PROPERTY_NAME = "query-prerequisites.factory";
    private final Map<String, QueryPrerequisitesFactory> queryPrerequisitesFactories = new ConcurrentHashMap<>();
    private final QueryPrerequisites defaultQueryPrerequisites = new DefaultQueryPrerequisites();
    private final AtomicReference<QueryPrerequisites> queryPrerequisites = new AtomicReference<>(defaultQueryPrerequisites);

    public void addQueryPrerequisitesFactory(QueryPrerequisitesFactory queryPrerequisitesFactory)
    {
        requireNonNull(queryPrerequisitesFactory, "queryPrerequisitesFactory is null");

        if (queryPrerequisitesFactories.putIfAbsent(queryPrerequisitesFactory.getName(), queryPrerequisitesFactory) != null) {
            throw new IllegalArgumentException(format("Query Prerequisites '%s' is already registered", queryPrerequisitesFactory.getName()));
        }
    }

    public void loadQueryPrerequisites()
            throws Exception
    {
        if (QUERY_PREREQUISITES_CONFIG.exists()) {
            Map<String, String> properties = new HashMap<>(loadProperties(QUERY_PREREQUISITES_CONFIG));

            String factoryName = properties.remove(QUERY_PREREQUISITES_PROPERTY_NAME);
            checkArgument(!isNullOrEmpty(factoryName),
                    "Query Prerequisites configuration %s does not contain %s", QUERY_PREREQUISITES_CONFIG.getAbsoluteFile(), QUERY_PREREQUISITES_PROPERTY_NAME);

            log.info("-- Loading query prerequisites factory --");

            QueryPrerequisitesFactory queryPrerequisitesFactory = queryPrerequisitesFactories.get(factoryName);
            checkState(queryPrerequisitesFactory != null, "Query prerequisites factory %s is not registered", factoryName);

            QueryPrerequisites queryPrerequisites = queryPrerequisitesFactory.create(properties);
            checkState(this.queryPrerequisites.compareAndSet(defaultQueryPrerequisites, queryPrerequisites), "Query prerequisites has already been set");

            log.info("-- Loaded query prerequisites %s --", factoryName);
        }
    }

    @Override
    public CompletableFuture<?> waitForPrerequisites(QueryId queryId, QueryPrerequisitesContext context, WarningCollector warningCollector)
    {
        checkState(queryPrerequisites.get() != null, "Query prerequisites not initiated");
        return queryPrerequisites.get().waitForPrerequisites(queryId, context, warningCollector);
    }

    @Override
    public void queryFinished(QueryId queryId)
    {
        checkState(queryPrerequisites.get() != null, "Query prerequisites not initiated");
        queryPrerequisites.get().queryFinished(queryId);
    }
}