PinotProxyGrpcRequestBuilder.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.pinot.query;

import com.facebook.presto.pinot.PinotErrorCode;
import com.facebook.presto.pinot.PinotException;
import org.apache.pinot.common.proto.Server;
import org.apache.pinot.common.utils.grpc.GrpcRequestBuilder;
import org.apache.pinot.spi.utils.CommonConstants;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class PinotProxyGrpcRequestBuilder
        extends GrpcRequestBuilder
{
    private static final String KEY_OF_PROXY_GRPC_FORWARD_HOST = "FORWARD_HOST";
    private static final String KEY_OF_PROXY_GRPC_FORWARD_PORT = "FORWARD_PORT";

    private String hostName;
    private int port = -1;
    private int requestId;
    private String brokerId = "unknown";
    private boolean enableTrace;
    private boolean enableStreaming;
    private String payloadType;
    private String sql;
    private List<String> segments;
    private Map<String, String> extraMetadata = new HashMap<>();

    public PinotProxyGrpcRequestBuilder setHostName(String hostName)
    {
        this.hostName = hostName;
        return this;
    }

    public PinotProxyGrpcRequestBuilder setPort(int port)
    {
        this.port = port;
        return this;
    }

    public PinotProxyGrpcRequestBuilder setRequestId(int requestId)
    {
        this.requestId = requestId;
        return this;
    }

    public PinotProxyGrpcRequestBuilder setBrokerId(String brokerId)
    {
        this.brokerId = brokerId;
        return this;
    }

    public PinotProxyGrpcRequestBuilder setEnableTrace(boolean enableTrace)
    {
        this.enableTrace = enableTrace;
        return this;
    }

    public PinotProxyGrpcRequestBuilder setEnableStreaming(boolean enableStreaming)
    {
        this.enableStreaming = enableStreaming;
        return this;
    }

    public PinotProxyGrpcRequestBuilder setSql(String sql)
    {
        payloadType = CommonConstants.Query.Request.PayloadType.SQL;
        this.sql = sql;
        return this;
    }

    public PinotProxyGrpcRequestBuilder addExtraMetadata(Map<String, String> extraMetadata)
    {
        this.extraMetadata.putAll(extraMetadata);
        return this;
    }

    public PinotProxyGrpcRequestBuilder setSegments(List<String> segments)
    {
        this.segments = segments;
        return this;
    }

    public Server.ServerRequest build()
    {
        if (payloadType == null || segments.isEmpty()) {
            throw new PinotException(PinotErrorCode.PINOT_INVALID_SEGMENT_QUERY_GENERATED, Optional.empty(), "Query and segmentsToQuery must be set");
        }
        if (!payloadType.equals(CommonConstants.Query.Request.PayloadType.SQL)) {
            throw new RuntimeException("Only [SQL] Payload type is allowed: " + payloadType);
        }
        Map<String, String> metadata = new HashMap<>();
        metadata.put(CommonConstants.Query.Request.MetadataKeys.REQUEST_ID, Integer.toString(requestId));
        metadata.put(CommonConstants.Query.Request.MetadataKeys.BROKER_ID, brokerId);
        metadata.put(CommonConstants.Query.Request.MetadataKeys.ENABLE_TRACE, Boolean.toString(enableTrace));
        metadata.put(CommonConstants.Query.Request.MetadataKeys.ENABLE_STREAMING, Boolean.toString(enableStreaming));
        metadata.put(CommonConstants.Query.Request.MetadataKeys.PAYLOAD_TYPE, payloadType);
        if (this.hostName != null) {
            metadata.put(KEY_OF_PROXY_GRPC_FORWARD_HOST, this.hostName);
        }
        if (this.port > 0) {
            metadata.put(KEY_OF_PROXY_GRPC_FORWARD_PORT, String.valueOf(this.port));
        }
        extraMetadata.forEach((k, v) -> metadata.put(k, v));
        return Server.ServerRequest.newBuilder()
            .putAllMetadata(metadata)
            .setSql(sql)
            .addAllSegments(segments)
            .build();
    }
}