PinotConfig.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.pinot;

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigSecuritySensitive;
import com.google.common.base.Splitter;
import com.google.common.base.Splitter.MapSplitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
import io.airlift.units.MinDuration;

import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;

import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

import static com.facebook.presto.pinot.PinotErrorCode.PINOT_INVALID_CONFIGURATION;
import static com.facebook.presto.pinot.PinotPushdownUtils.PINOT_DISTINCT_COUNT_FUNCTION_NAME;
import static com.google.common.base.Preconditions.checkArgument;
import static io.airlift.units.DataSize.Unit.MEGABYTE;

public class PinotConfig
{
    public static final int DEFAULT_LIMIT_LARGE_FOR_SEGMENT = Integer.MAX_VALUE;
    public static final int DEFAULT_NON_AGGREGATE_LIMIT_FOR_BROKER_QUERIES = 25_000;
    public static final int DEFAULT_STREAMING_SERVER_GRPC_MAX_INBOUND_MESSAGE_BYTES = (int) new DataSize(128, MEGABYTE).toBytes();

    // There is a perf penalty of having a large topN since the structures are allocated to this size
    // So size this judiciously
    public static final int DEFAULT_TOPN_LARGE = 10_000;
    public static final int DEFAULT_PROXY_GRPC_PORT = 8124;

    public static final String DEFAULT_GRPC_TLS_STORE_TYPE = "PKCS12";

    private static final Duration DEFAULT_CONNECTION_TIMEOUT = new Duration(1, TimeUnit.MINUTES);
    private static final int DEFAULT_ESTIMATED_SIZE_IN_BYTES_FOR_NON_NUMERIC_COLUMN = 20;

    private static final Splitter LIST_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings();
    private static final MapSplitter MAP_SPLITTER = Splitter.on(",").trimResults().omitEmptyStrings().withKeyValueSeparator(":");

    private String controllerRestService;
    private String serviceHeaderParam = "RPC-Service";
    private String callerHeaderValue = "presto";
    private String callerHeaderParam = "RPC-Caller";

    private List<String> controllerUrls = ImmutableList.of();
    private String restProxyServiceForQuery;

    private int limitLargeForSegment = DEFAULT_LIMIT_LARGE_FOR_SEGMENT;
    private int topNLarge = DEFAULT_TOPN_LARGE;

    private Duration connectionTimeout = DEFAULT_CONNECTION_TIMEOUT;

    private int estimatedSizeInBytesForNonNumericColumn = DEFAULT_ESTIMATED_SIZE_IN_BYTES_FOR_NON_NUMERIC_COLUMN;
    private Map<String, String> extraHttpHeaders = ImmutableMap.of();
    private Duration metadataCacheExpiry = new Duration(2, TimeUnit.MINUTES);

    private boolean forbidBrokerQueries;
    private boolean forbidSegmentQueries;
    private boolean attemptBrokerQueries;

    // Infer Pinot time fields to Presto Date/Timestamp type
    private boolean inferDateTypeInSchema = true;
    private boolean inferTimestampTypeInSchema = true;

    // Retry on data fetch exceptions, e.g. timeout from the server side.
    private boolean markDataFetchExceptionsAsRetriable = true;

    // Requires Pinot version >= 0.6.0.
    private int streamingServerGrpcMaxInboundMessageBytes = DEFAULT_STREAMING_SERVER_GRPC_MAX_INBOUND_MESSAGE_BYTES;

    private int numSegmentsPerSplit = 1;
    private boolean ignoreEmptyResponses;
    private int fetchRetryCount = 2;
    private boolean useDateTrunc;
    private int nonAggregateLimitForBrokerQueries = DEFAULT_NON_AGGREGATE_LIMIT_FOR_BROKER_QUERIES;
    private boolean pushdownTopNBrokerQueries = true;
    private boolean pushdownProjectExpressions = true;
    private String grpcHost;
    private int grpcPort = DEFAULT_PROXY_GRPC_PORT;
    private boolean useProxy;
    private boolean useSecureConnection;
    private Map<String, String> extraGrpcMetadata = ImmutableMap.of();
    private String overrideDistinctCountFunction = PINOT_DISTINCT_COUNT_FUNCTION_NAME;
    private String grpcTlsKeyStorePath;
    private String grpcTlsKeyStorePassword;
    private String grpcTlsKeyStoreType = DEFAULT_GRPC_TLS_STORE_TYPE;
    private String grpcTlsTrustStorePath;
    private String grpcTlsTrustStorePassword;
    private String grpcTlsTrustStoreType = DEFAULT_GRPC_TLS_STORE_TYPE;

    private String controllerAuthenticationType = "NONE";
    private String controllerAuthenticationUser;
    private String controllerAuthenticationPassword;
    private String brokerAuthenticationType = "NONE";
    private String brokerAuthenticationUser;
    private String brokerAuthenticationPassword;

    private String queryOptions;

    @NotNull
    public Map<String, String> getExtraHttpHeaders()
    {
        return extraHttpHeaders;
    }

    @Config("pinot.extra-http-headers")
    public PinotConfig setExtraHttpHeaders(String headers)
    {
        extraHttpHeaders = ImmutableMap.copyOf(MAP_SPLITTER.split(headers));
        return this;
    }

    @NotNull
    public Map<String, String> getExtraGrpcMetadata()
    {
        return extraGrpcMetadata;
    }

    @Config("pinot.extra-grpc-metadata")
    public PinotConfig setExtraGrpcMetadata(String metadata)
    {
        extraGrpcMetadata = ImmutableMap.copyOf(MAP_SPLITTER.split(metadata));
        return this;
    }

    @NotNull
    public List<String> getControllerUrls()
    {
        return controllerUrls;
    }

    @Config("pinot.controller-urls")
    public PinotConfig setControllerUrls(String controllerUrl)
    {
        this.controllerUrls = LIST_SPLITTER.splitToList(controllerUrl);
        return this;
    }

    @Nullable
    public String getControllerRestService()
    {
        return controllerRestService;
    }

    @Config("pinot.controller-rest-service")
    public PinotConfig setControllerRestService(String controllerRestService)
    {
        this.controllerRestService = controllerRestService;
        return this;
    }

    @NotNull
    public int getLimitLargeForSegment()
    {
        return limitLargeForSegment;
    }

    @Config("pinot.limit-large-for-segment")
    public PinotConfig setLimitLargeForSegment(int limitLargeForSegment)
    {
        this.limitLargeForSegment = limitLargeForSegment;
        return this;
    }

    @NotNull
    public int getTopNLarge()
    {
        return topNLarge;
    }

    @Config("pinot.topn-large")
    public PinotConfig setTopNLarge(int topNLarge)
    {
        this.topNLarge = topNLarge;
        return this;
    }

    @MinDuration("0s")
    @NotNull
    public Duration getMetadataCacheExpiry()
    {
        return metadataCacheExpiry;
    }

    @Config("pinot.metadata-expiry")
    public PinotConfig setMetadataCacheExpiry(Duration metadataCacheExpiry)
    {
        this.metadataCacheExpiry = metadataCacheExpiry;
        return this;
    }

    @NotNull
    public int getEstimatedSizeInBytesForNonNumericColumn()
    {
        return estimatedSizeInBytesForNonNumericColumn;
    }

    @Config("pinot.estimated-size-in-bytes-for-non-numeric-column")
    public PinotConfig setEstimatedSizeInBytesForNonNumericColumn(int estimatedSizeInBytesForNonNumericColumn)
    {
        this.estimatedSizeInBytesForNonNumericColumn = estimatedSizeInBytesForNonNumericColumn;
        return this;
    }

    @NotNull
    public String getServiceHeaderParam()
    {
        return serviceHeaderParam;
    }

    @Config("pinot.service-header-param")
    public PinotConfig setServiceHeaderParam(String serviceHeaderParam)
    {
        this.serviceHeaderParam = serviceHeaderParam;
        return this;
    }

    @NotNull
    public String getCallerHeaderValue()
    {
        return callerHeaderValue;
    }

    @Config("pinot.caller-header-value")
    public PinotConfig setCallerHeaderValue(String callerHeaderValue)
    {
        this.callerHeaderValue = callerHeaderValue;
        return this;
    }

    @NotNull
    public String getCallerHeaderParam()
    {
        return callerHeaderParam;
    }

    @Config("pinot.caller-header-param")
    public PinotConfig setCallerHeaderParam(String callerHeaderParam)
    {
        this.callerHeaderParam = callerHeaderParam;
        return this;
    }

    public boolean isForbidBrokerQueries()
    {
        return forbidBrokerQueries;
    }

    @Config("pinot.forbid-broker-queries")
    public PinotConfig setForbidBrokerQueries(boolean forbidBrokerQueries)
    {
        this.forbidBrokerQueries = forbidBrokerQueries;
        return this;
    }

    public boolean isForbidSegmentQueries()
    {
        return forbidSegmentQueries;
    }

    @Config("pinot.forbid-segment-queries")
    public PinotConfig setForbidSegmentQueries(boolean forbidSegmentQueries)
    {
        this.forbidSegmentQueries = forbidSegmentQueries;
        return this;
    }

    public boolean isAttemptBrokerQueries()
    {
        return attemptBrokerQueries;
    }

    @Config("pinot.attempt-broker-queries")
    public PinotConfig setAttemptBrokerQueries(boolean attemptBrokerQueries)
    {
        this.attemptBrokerQueries = attemptBrokerQueries;
        return this;
    }

    @Nullable
    public String getRestProxyServiceForQuery()
    {
        return restProxyServiceForQuery;
    }

    @Config("pinot.rest-proxy-service-for-query")
    public PinotConfig setRestProxyServiceForQuery(String restProxyServiceForQuery)
    {
        this.restProxyServiceForQuery = restProxyServiceForQuery;
        return this;
    }

    public boolean isUseDateTrunc()
    {
        return useDateTrunc;
    }

    @Config("pinot.use-date-trunc")
    public PinotConfig setUseDateTrunc(boolean useDateTrunc)
    {
        this.useDateTrunc = useDateTrunc;
        return this;
    }

    public int getNumSegmentsPerSplit()
    {
        return this.numSegmentsPerSplit;
    }

    @Config("pinot.num-segments-per-split")
    public PinotConfig setNumSegmentsPerSplit(int numSegmentsPerSplit)
    {
        checkArgument(numSegmentsPerSplit > 0, "Number of segments per split must be more than zero");
        this.numSegmentsPerSplit = numSegmentsPerSplit;
        return this;
    }

    public int getFetchRetryCount()
    {
        return fetchRetryCount;
    }

    @Config("pinot.fetch-retry-count")
    public PinotConfig setFetchRetryCount(int fetchRetryCount)
    {
        this.fetchRetryCount = fetchRetryCount;
        return this;
    }

    public int getNonAggregateLimitForBrokerQueries()
    {
        return nonAggregateLimitForBrokerQueries;
    }

    @Config("pinot.non-aggregate-limit-for-broker-queries")
    public PinotConfig setNonAggregateLimitForBrokerQueries(int nonAggregateLimitForBrokerQueries)
    {
        this.nonAggregateLimitForBrokerQueries = nonAggregateLimitForBrokerQueries;
        return this;
    }

    public boolean isInferDateTypeInSchema()
    {
        return inferDateTypeInSchema;
    }

    @Config("pinot.infer-date-type-in-schema")
    public PinotConfig setInferDateTypeInSchema(boolean inferDateTypeInSchema)
    {
        this.inferDateTypeInSchema = inferDateTypeInSchema;
        return this;
    }

    public boolean isInferTimestampTypeInSchema()
    {
        return inferTimestampTypeInSchema;
    }

    @Config("pinot.infer-timestamp-type-in-schema")
    public PinotConfig setInferTimestampTypeInSchema(boolean inferTimestampTypeInSchema)
    {
        this.inferTimestampTypeInSchema = inferTimestampTypeInSchema;
        return this;
    }

    public boolean isMarkDataFetchExceptionsAsRetriable()
    {
        return markDataFetchExceptionsAsRetriable;
    }

    @Config("pinot.mark-data-fetch-exceptions-as-retriable")
    public PinotConfig setMarkDataFetchExceptionsAsRetriable(boolean markDataFetchExceptionsAsRetriable)
    {
        this.markDataFetchExceptionsAsRetriable = markDataFetchExceptionsAsRetriable;
        return this;
    }

    public boolean isPushdownTopNBrokerQueries()
    {
        return pushdownTopNBrokerQueries;
    }

    // This is used to switch on/off push down Pinot broker queries with ORDER BY clause and LIMIT.
    // The reason is that presto doesn't retain the order of query response from Pinot for large number of records returned.
    @Config("pinot.pushdown-topn-broker-queries")
    public PinotConfig setPushdownTopNBrokerQueries(boolean pushdownTopNBrokerQueries)
    {
        this.pushdownTopNBrokerQueries = pushdownTopNBrokerQueries;
        return this;
    }

    public boolean isPushdownProjectExpressions()
    {
        return pushdownProjectExpressions;
    }

    @Config("pinot.pushdown-project-expressions")
    public PinotConfig setPushdownProjectExpressions(boolean pushdownProjectExpressions)
    {
        this.pushdownProjectExpressions = pushdownProjectExpressions;
        return this;
    }

    public int getStreamingServerGrpcMaxInboundMessageBytes()
    {
        return streamingServerGrpcMaxInboundMessageBytes;
    }

    @Config("pinot.streaming-server-grpc-max-inbound-message-bytes")
    public PinotConfig setStreamingServerGrpcMaxInboundMessageBytes(int streamingServerGrpcMaxInboundMessageBytes)
    {
        this.streamingServerGrpcMaxInboundMessageBytes = streamingServerGrpcMaxInboundMessageBytes;
        return this;
    }

    public boolean isUseProxy()
    {
        return this.useProxy;
    }

    @Config("pinot.proxy-enabled")
    public PinotConfig setUseProxy(boolean useProxy)
    {
        this.useProxy = useProxy;
        return this;
    }

    public String getGrpcHost()
    {
        return grpcHost;
    }

    @Config("pinot.grpc-host")
    public PinotConfig setGrpcHost(String grpcHost)
    {
        this.grpcHost = grpcHost;
        return this;
    }

    public int getGrpcPort()
    {
        return grpcPort;
    }

    @Config("pinot.grpc-port")
    public PinotConfig setGrpcPort(int grpcPort)
    {
        this.grpcPort = grpcPort;
        return this;
    }

    public boolean isUseSecureConnection()
    {
        return this.useSecureConnection;
    }

    @Config("pinot.secure-connection")
    public PinotConfig setUseSecureConnection(boolean useSecureConnection)
    {
        this.useSecureConnection = useSecureConnection;
        return this;
    }

    public String getOverrideDistinctCountFunction()
    {
        return this.overrideDistinctCountFunction;
    }

    @Config("pinot.override-distinct-count-function")
    public PinotConfig setOverrideDistinctCountFunction(String overrideDistinctCountFunction)
    {
        this.overrideDistinctCountFunction = overrideDistinctCountFunction;
        return this;
    }

    public String getGrpcTlsKeyStorePath()
    {
        return grpcTlsKeyStorePath;
    }

    @Config("pinot.grpc-tls-key-store-path")
    public PinotConfig setGrpcTlsKeyStorePath(String grpcTlsKeyStorePath)
    {
        this.grpcTlsKeyStorePath = grpcTlsKeyStorePath;
        return this;
    }

    public String getGrpcTlsKeyStorePassword()
    {
        return grpcTlsKeyStorePassword;
    }

    @Config("pinot.grpc-tls-key-store-password")
    @ConfigSecuritySensitive
    public PinotConfig setGrpcTlsKeyStorePassword(String grpcTlsKeyStorePassword)
    {
        this.grpcTlsKeyStorePassword = grpcTlsKeyStorePassword;
        return this;
    }

    public String getGrpcTlsKeyStoreType()
    {
        return grpcTlsKeyStoreType;
    }

    @Config("pinot.grpc-tls-key-store-type")
    public PinotConfig setGrpcTlsKeyStoreType(String grpcTlsKeyStoreType)
    {
        this.grpcTlsKeyStoreType = grpcTlsKeyStoreType;
        return this;
    }

    public String getGrpcTlsTrustStorePath()
    {
        return grpcTlsTrustStorePath;
    }

    @Config("pinot.grpc-tls-trust-store-path")
    public PinotConfig setGrpcTlsTrustStorePath(String grpcTlsTrustStorePath)
    {
        this.grpcTlsTrustStorePath = grpcTlsTrustStorePath;
        return this;
    }

    public String getGrpcTlsTrustStorePassword()
    {
        return grpcTlsTrustStorePassword;
    }

    @Config("pinot.grpc-tls-trust-store-password")
    @ConfigSecuritySensitive
    public PinotConfig setGrpcTlsTrustStorePassword(String grpcTlsTrustStorePassword)
    {
        this.grpcTlsTrustStorePassword = grpcTlsTrustStorePassword;
        return this;
    }

    public String getGrpcTlsTrustStoreType()
    {
        return grpcTlsTrustStoreType;
    }

    @Config("pinot.grpc-tls-trust-store-type")
    public PinotConfig setGrpcTlsTrustStoreType(String grpcTlsTrustStoreType)
    {
        this.grpcTlsTrustStoreType = grpcTlsTrustStoreType;
        return this;
    }

    @NotNull
    public String getControllerAuthenticationType()
    {
        return controllerAuthenticationType;
    }

    @Config("pinot.controller-authentication-type")
    public PinotConfig setControllerAuthenticationType(String controllerAuthenticationType)
    {
        this.controllerAuthenticationType = controllerAuthenticationType;
        return this;
    }

    public String getControllerAuthenticationUser()
    {
        return controllerAuthenticationUser;
    }

    @Config("pinot.controller-authentication-user")
    public PinotConfig setControllerAuthenticationUser(String controllerAuthenticationUser)
    {
        this.controllerAuthenticationUser = controllerAuthenticationUser;
        return this;
    }

    public String getControllerAuthenticationPassword()
    {
        return controllerAuthenticationPassword;
    }

    @Config("pinot.controller-authentication-password")
    @ConfigSecuritySensitive
    public PinotConfig setControllerAuthenticationPassword(String controllerAuthenticationPassword)
    {
        this.controllerAuthenticationPassword = controllerAuthenticationPassword;
        return this;
    }

    @NotNull
    public String getBrokerAuthenticationType()
    {
        return brokerAuthenticationType;
    }

    @Config("pinot.broker-authentication-type")
    public PinotConfig setBrokerAuthenticationType(String brokerAuthenticationType)
    {
        this.brokerAuthenticationType = brokerAuthenticationType;
        return this;
    }

    public String getBrokerAuthenticationUser()
    {
        return brokerAuthenticationUser;
    }

    @Config("pinot.broker-authentication-user")
    public PinotConfig setBrokerAuthenticationUser(String brokerAuthenticationUser)
    {
        this.brokerAuthenticationUser = brokerAuthenticationUser;
        return this;
    }

    public String getBrokerAuthenticationPassword()
    {
        return brokerAuthenticationPassword;
    }

    @Config("pinot.broker-authentication-password")
    @ConfigSecuritySensitive
    public PinotConfig setBrokerAuthenticationPassword(String brokerAuthenticationPassword)
    {
        this.brokerAuthenticationPassword = brokerAuthenticationPassword;
        return this;
    }

    /**
     * Randomly select one controller from the property in pinot controller list.
     *
     * @return one URL string of pinot controller
     */
    public String getControllerUrl()
    {
        List<String> controllerUrls = getControllerUrls();
        if (controllerUrls.isEmpty()) {
            throw new PinotException(PINOT_INVALID_CONFIGURATION, Optional.empty(), "No pinot controllers specified");
        }
        return controllerUrls.get(ThreadLocalRandom.current().nextInt(controllerUrls.size()));
    }

    public String getQueryOptions()
    {
        return queryOptions;
    }

    @Config("pinot.query-options")
    public PinotConfig setQueryOptions(String options)
    {
        queryOptions = options;
        return this;
    }
}