PinotSplit.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.pinot;

import com.facebook.presto.pinot.query.PinotQueryGenerator;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;

import java.util.List;
import java.util.Optional;

import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
import static java.util.Objects.requireNonNull;

public class PinotSplit
        implements ConnectorSplit
{
    private final String connectorId;
    private final SplitType splitType;
    private final List<PinotColumnHandle> expectedColumnHandles;

    // Properties needed for broker split type
    private final Optional<PinotQueryGenerator.GeneratedPinotQuery> brokerPinotQuery;

    // Properties needed for segment split type
    private final Optional<String> segmentPinotQuery;
    private final List<String> segments;
    private final Optional<String> segmentHost;
    private final Optional<Integer> grpcPort;

    @JsonCreator
    public PinotSplit(
            @JsonProperty("connectorId") String connectorId,
            @JsonProperty("splitType") SplitType splitType,
            @JsonProperty("expectedColumnHandles") List<PinotColumnHandle> expectedColumnHandles,
            @JsonProperty("brokerQuery") Optional<PinotQueryGenerator.GeneratedPinotQuery> brokerPinotQuery,
            @JsonProperty("segmentPinotQuery") Optional<String> segmentPinotQuery,
            @JsonProperty("segments") List<String> segments,
            @JsonProperty("segmentHost") Optional<String> segmentHost,
            @JsonProperty("grpcPort") Optional<Integer> grpcPort)
    {
        this.connectorId = requireNonNull(connectorId, "connector id is null");
        this.splitType = requireNonNull(splitType, "splitType id is null");
        this.expectedColumnHandles = requireNonNull(expectedColumnHandles, "expected column handles is null");
        this.brokerPinotQuery = requireNonNull(brokerPinotQuery, "brokerPinotQuery is null");
        this.segmentPinotQuery = requireNonNull(segmentPinotQuery, "segmentPinotQuery is null");
        this.segments = ImmutableList.copyOf(requireNonNull(segments, "segment is null"));
        this.segmentHost = requireNonNull(segmentHost, "host is null");
        this.grpcPort = grpcPort;

        // make sure the segment properties are present when the split type is segment
        if (splitType == SplitType.SEGMENT) {
            checkArgument(segmentPinotQuery.isPresent(), "segmentPinotQuery is missing from the split");
            checkArgument(!segments.isEmpty(), "Segments are missing from the split");
            checkArgument(segmentHost.isPresent(), "Segment host address is missing from the split");
        }
        else {
            checkArgument(brokerPinotQuery.isPresent(), "brokerPinotQuery is missing from the split");
        }
    }

    public static PinotSplit createBrokerSplit(String connectorId, List<PinotColumnHandle> expectedColumnHandles, PinotQueryGenerator.GeneratedPinotQuery brokerQuery)
    {
        return new PinotSplit(
                requireNonNull(connectorId, "connector id is null"),
                SplitType.BROKER,
                expectedColumnHandles,
                Optional.of(requireNonNull(brokerQuery, "brokerQuery is null")),
                Optional.empty(),
                ImmutableList.of(),
                Optional.empty(),
                Optional.empty());
    }

    public static PinotSplit createSegmentSplit(String connectorId, String segmentPinotQuery, List<PinotColumnHandle> expectedColumnHandles, List<String> segments, String segmentHost, int grpcPort)
    {
        return new PinotSplit(
                requireNonNull(connectorId, "connector id is null"),
                SplitType.SEGMENT,
                expectedColumnHandles,
                Optional.empty(),
                Optional.of(requireNonNull(segmentPinotQuery, "segmentPinotQuery is null")),
                requireNonNull(segments, "segments are null"),
                Optional.of(requireNonNull(segmentHost, "segmentHost is null")),
                Optional.of(grpcPort));
    }

    @JsonProperty
    public String getConnectorId()
    {
        return connectorId;
    }

    @JsonProperty
    public SplitType getSplitType()
    {
        return splitType;
    }

    @JsonProperty
    public Optional<PinotQueryGenerator.GeneratedPinotQuery> getBrokerPinotQuery()
    {
        return brokerPinotQuery;
    }

    @JsonProperty
    public Optional<String> getSegmentPinotQuery()
    {
        return segmentPinotQuery;
    }

    @JsonProperty
    public Optional<String> getSegmentHost()
    {
        return segmentHost;
    }

    @JsonProperty
    public List<String> getSegments()
    {
        return segments;
    }

    // Extract grpc host name from the segmentHost.
    // Usually segmentHost is in the format of `Server_127.0.0.1_8090`, so the hostname can be parsed from it, or will just use the entire string for that.
    public Optional<String> getGrpcHost()
    {
        if (segmentHost.isPresent()) {
            String[] hostSplits = segmentHost.get().split("_");
            return (hostSplits.length > 1) ? Optional.of(hostSplits[hostSplits.length - 2]) : segmentHost;
        }
        return Optional.empty();
    }

    @JsonProperty
    public Optional<Integer> getGrpcPort()
    {
        return grpcPort;
    }

    @Override
    public String toString()
    {
        return toStringHelper(this)
                .add("connectorId", connectorId)
                .add("splitType", splitType)
                .add("columnHandle", expectedColumnHandles)
                .add("segmentPinotQuery", segmentPinotQuery)
                .add("brokerPinotQuery", brokerPinotQuery)
                .add("segments", segments)
                .add("segmentHost", segmentHost)
                .toString();
    }

    @JsonProperty
    public List<PinotColumnHandle> getExpectedColumnHandles()
    {
        return expectedColumnHandles;
    }

    @Override
    public NodeSelectionStrategy getNodeSelectionStrategy()
    {
        return NO_PREFERENCE;
    }

    @Override
    public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
    {
        return ImmutableList.of();
    }

    @Override
    public Object getInfo()
    {
        return this;
    }

    public enum SplitType
    {
        SEGMENT,
        BROKER,
    }
}