PinotConnection.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.pinot;

import com.facebook.presto.pinot.PinotClusterInfoFetcher.TimeBoundary;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import org.apache.pinot.spi.data.Schema;

import javax.inject.Inject;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import static com.facebook.presto.pinot.PinotErrorCode.PINOT_UNCLASSIFIED_ERROR;
import static com.google.common.cache.CacheLoader.asyncReloading;
import static java.util.Objects.requireNonNull;

public class PinotConnection
{
    private static final Object ALL_TABLES_CACHE_KEY = new Object();

    private final LoadingCache<String, List<PinotColumn>> pinotTableColumnCache;
    private final LoadingCache<Object, List<String>> allTablesCache;
    private final PinotConfig pinotConfig;
    private final PinotClusterInfoFetcher pinotClusterInfoFetcher;

    @Inject
    public PinotConnection(
            PinotClusterInfoFetcher pinotClusterInfoFetcher,
            PinotConfig pinotConfig,
            @ForPinot Executor executor)
    {
        this.pinotConfig = requireNonNull(pinotConfig, "pinot config");
        final long metadataCacheExpiryMillis = this.pinotConfig.getMetadataCacheExpiry().roundTo(TimeUnit.MILLISECONDS);
        this.pinotClusterInfoFetcher = requireNonNull(pinotClusterInfoFetcher, "cluster info fetcher is null");
        this.allTablesCache = CacheBuilder.newBuilder()
                .refreshAfterWrite(metadataCacheExpiryMillis, TimeUnit.MILLISECONDS)
                .build(asyncReloading(CacheLoader.from(pinotClusterInfoFetcher::getAllTables), executor));

        boolean nullHandlingEnabled = PinotQueryOptionsUtils.isNullHandlingEnabled(pinotConfig.getQueryOptions());
        this.pinotTableColumnCache =
                CacheBuilder.newBuilder()
                        .refreshAfterWrite(metadataCacheExpiryMillis, TimeUnit.MILLISECONDS)
                        .build(asyncReloading(new CacheLoader<String, List<PinotColumn>>()
                        {
                            @Override
                            public List<PinotColumn> load(String tableName)
                                    throws Exception
                            {
                                Schema tablePinotSchema = pinotClusterInfoFetcher.getTableSchema(tableName);
                                return PinotColumnUtils.getPinotColumnsForPinotSchema(tablePinotSchema, pinotConfig.isInferDateTypeInSchema(), pinotConfig.isInferTimestampTypeInSchema(), nullHandlingEnabled);
                            }
                        }, executor));

        executor.execute(() -> this.allTablesCache.refresh(ALL_TABLES_CACHE_KEY));
    }

    private static <K, V> V getFromCache(LoadingCache<K, V> cache, K key)
    {
        V value = cache.getIfPresent(key);
        if (value != null) {
            return value;
        }
        try {
            return cache.get(key);
        }
        catch (ExecutionException e) {
            throw new PinotException(PINOT_UNCLASSIFIED_ERROR, Optional.empty(), "Cannot fetch from cache " + key, e.getCause());
        }
    }

    public List<String> getTableNames()
    {
        return getFromCache(allTablesCache, ALL_TABLES_CACHE_KEY);
    }

    public PinotTable getTable(String tableName)
    {
        List<PinotColumn> columns = getPinotColumnsForTable(tableName);
        return new PinotTable(tableName, columns);
    }

    private List<PinotColumn> getPinotColumnsForTable(String tableName)
    {
        return getFromCache(pinotTableColumnCache, tableName);
    }

    public Map<String, Map<String, List<String>>> getRoutingTable(String tableName)
    {
        return pinotClusterInfoFetcher.getRoutingTableForTable(tableName);
    }

    public TimeBoundary getTimeBoundary(String tableName)
    {
        return pinotClusterInfoFetcher.getTimeBoundaryForTable(tableName);
    }

    public int getGrpcPort(String serverInstance)
    {
        return pinotClusterInfoFetcher.getGrpcPort(serverInstance);
    }
}