HiveSplit.java

/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.facebook.presto.hive;

import com.facebook.presto.hive.metastore.Storage;
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.SplitWeight;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.Set;

import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.SOFT_AFFINITY;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;

public class HiveSplit
        implements ConnectorSplit
{
    private final HiveFileSplit fileSplit;
    private final Storage storage;
    private final List<HivePartitionKey> partitionKeys;
    private final List<HostAddress> addresses;
    private final String database;
    private final String table;
    private final String partitionName;
    private final OptionalInt readBucketNumber;
    private final OptionalInt tableBucketNumber;
    private final NodeSelectionStrategy nodeSelectionStrategy;
    private final int partitionDataColumnCount;
    private final TableToPartitionMapping tableToPartitionMapping;
    private final Optional<BucketConversion> bucketConversion;
    private final boolean s3SelectPushdownEnabled;
    private final CacheQuotaRequirement cacheQuotaRequirement;
    private final Optional<EncryptionInformation> encryptionInformation;
    private final Set<ColumnHandle> redundantColumnDomains;
    private final SplitWeight splitWeight;
    private final Optional<byte[]> rowIdPartitionComponent;

    @JsonCreator
    public HiveSplit(
            @JsonProperty("fileSplit") HiveFileSplit fileSplit,
            @JsonProperty("database") String database,
            @JsonProperty("table") String table,
            @JsonProperty("partitionName") String partitionName,
            @JsonProperty("storage") Storage storage,
            @JsonProperty("partitionKeys") List<HivePartitionKey> partitionKeys,
            @JsonProperty("addresses") List<HostAddress> addresses,
            @JsonProperty("readBucketNumber") OptionalInt readBucketNumber,
            @JsonProperty("tableBucketNumber") OptionalInt tableBucketNumber,
            @JsonProperty("nodeSelectionStrategy") NodeSelectionStrategy nodeSelectionStrategy,
            @JsonProperty("partitionDataColumnCount") int partitionDataColumnCount,
            @JsonProperty("tableToPartitionMapping") TableToPartitionMapping tableToPartitionMapping,
            @JsonProperty("bucketConversion") Optional<BucketConversion> bucketConversion,
            @JsonProperty("s3SelectPushdownEnabled") boolean s3SelectPushdownEnabled,
            @JsonProperty("cacheQuota") CacheQuotaRequirement cacheQuotaRequirement,
            @JsonProperty("encryptionMetadata") Optional<EncryptionInformation> encryptionInformation,
            @JsonProperty("redundantColumnDomains") Set<ColumnHandle> redundantColumnDomains,
            @JsonProperty("splitWeight") SplitWeight splitWeight,
            @JsonProperty("rowIdPartitionComponent") Optional<byte[]> rowIdPartitionComponent)
    {
        requireNonNull(fileSplit, "fileSplit is null");
        requireNonNull(database, "database is null");
        requireNonNull(table, "table is null");
        requireNonNull(partitionName, "partitionName is null");
        requireNonNull(storage, "storage is null");
        requireNonNull(partitionKeys, "partitionKeys is null");
        requireNonNull(addresses, "addresses is null");
        requireNonNull(readBucketNumber, "readBucketNumber is null");
        requireNonNull(tableBucketNumber, "tableBucketNumber is null");
        requireNonNull(nodeSelectionStrategy, "nodeSelectionStrategy is null");
        requireNonNull(tableToPartitionMapping, "tableToPartitionMapping is null");
        requireNonNull(bucketConversion, "bucketConversion is null");
        requireNonNull(cacheQuotaRequirement, "cacheQuotaRequirement is null");
        requireNonNull(encryptionInformation, "encryptionMetadata is null");
        requireNonNull(redundantColumnDomains, "redundantColumnDomains is null");
        requireNonNull(rowIdPartitionComponent, "rowIdPartitionComponent is null");

        this.fileSplit = fileSplit;
        this.database = database;
        this.table = table;
        this.partitionName = partitionName;
        this.storage = storage;
        this.partitionKeys = ImmutableList.copyOf(partitionKeys);
        this.addresses = ImmutableList.copyOf(addresses);
        this.readBucketNumber = readBucketNumber;
        this.tableBucketNumber = tableBucketNumber;
        this.nodeSelectionStrategy = nodeSelectionStrategy;
        this.partitionDataColumnCount = partitionDataColumnCount;
        this.tableToPartitionMapping = tableToPartitionMapping;
        this.bucketConversion = bucketConversion;
        this.s3SelectPushdownEnabled = s3SelectPushdownEnabled;
        this.cacheQuotaRequirement = cacheQuotaRequirement;
        this.encryptionInformation = encryptionInformation;
        this.redundantColumnDomains = ImmutableSet.copyOf(redundantColumnDomains);
        this.splitWeight = requireNonNull(splitWeight, "splitWeight is null");
        this.rowIdPartitionComponent = rowIdPartitionComponent;
    }

    @JsonProperty
    public HiveFileSplit getFileSplit()
    {
        return fileSplit;
    }

    @JsonProperty
    public String getDatabase()
    {
        return database;
    }

    @JsonProperty
    public String getTable()
    {
        return table;
    }

    @JsonProperty
    public String getPartitionName()
    {
        return partitionName;
    }

    @JsonProperty
    public Storage getStorage()
    {
        return storage;
    }

    @JsonProperty
    public List<HivePartitionKey> getPartitionKeys()
    {
        return partitionKeys;
    }

    @JsonProperty
    public List<HostAddress> getAddresses()
    {
        return addresses;
    }

    @Override
    public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
    {
        if (getNodeSelectionStrategy() == SOFT_AFFINITY) {
            return nodeProvider.get(fileSplit.getPath() + "#" + fileSplit.getAffinitySchedulingFileSectionIndex());
        }
        return addresses;
    }

    @JsonProperty
    public OptionalInt getReadBucketNumber()
    {
        return readBucketNumber;
    }

    @JsonProperty
    public OptionalInt getTableBucketNumber()
    {
        return tableBucketNumber;
    }

    @JsonProperty
    public int getPartitionDataColumnCount()
    {
        return partitionDataColumnCount;
    }

    @JsonProperty
    public TableToPartitionMapping getTableToPartitionMapping()
    {
        return tableToPartitionMapping;
    }

    @JsonProperty
    public Optional<BucketConversion> getBucketConversion()
    {
        return bucketConversion;
    }

    @JsonProperty
    @Override
    public NodeSelectionStrategy getNodeSelectionStrategy()
    {
        return nodeSelectionStrategy;
    }

    @JsonProperty
    public boolean isS3SelectPushdownEnabled()
    {
        return s3SelectPushdownEnabled;
    }

    @JsonProperty
    public CacheQuotaRequirement getCacheQuotaRequirement()
    {
        return cacheQuotaRequirement;
    }

    @JsonProperty
    public Optional<EncryptionInformation> getEncryptionInformation()
    {
        return encryptionInformation;
    }

    @JsonProperty
    public Set<ColumnHandle> getRedundantColumnDomains()
    {
        return redundantColumnDomains;
    }

    @JsonProperty
    @Override
    public SplitWeight getSplitWeight()
    {
        return splitWeight;
    }

    @JsonProperty
    public Optional<byte[]> getRowIdPartitionComponent()
    {
        return rowIdPartitionComponent;
    }

    @Override
    public Object getInfo()
    {
        return ImmutableMap.builder()
                .put("path", fileSplit.getPath())
                .put("start", fileSplit.getStart())
                .put("length", fileSplit.getLength())
                .put("fileSize", fileSplit.getFileSize())
                .put("fileModifiedTime", fileSplit.getFileModifiedTime())
                .put("hosts", addresses)
                .put("database", database)
                .put("table", table)
                .put("nodeSelectionStrategy", nodeSelectionStrategy)
                .put("partitionName", partitionName)
                .put("s3SelectPushdownEnabled", s3SelectPushdownEnabled)
                .put("cacheQuotaRequirement", cacheQuotaRequirement)
                .build();
    }

    @Override
    public Map<String, String> getInfoMap()
    {
        return ImmutableMap.<String, String>builder()
                .put("path", fileSplit.getPath())
                .put("start", Long.toString(fileSplit.getStart()))
                .put("length", Long.toString(fileSplit.getLength()))
                .put("fileSize", Long.toString(fileSplit.getFileSize()))
                .put("fileModifiedTime", Long.toString(fileSplit.getFileModifiedTime()))
                .put("hosts", addresses.toString())
                .put("database", database)
                .put("table", table)
                .put("nodeSelectionStrategy", nodeSelectionStrategy.toString())
                .put("partitionName", partitionName)
                .put("s3SelectPushdownEnabled", Boolean.toString(s3SelectPushdownEnabled))
                .put("cacheQuotaRequirement", cacheQuotaRequirement.toString())
                .put("readBucketNumber", readBucketNumber.toString())
                .put("tableBucketNumber", tableBucketNumber.toString())
                .build();
    }

    @Override
    public Object getSplitIdentifier()
    {
        return ImmutableMap.builder()
                .put("path", fileSplit.getPath())
                .put("start", fileSplit.getStart())
                .put("length", fileSplit.getLength())
                .build();
    }

    @Override
    public OptionalLong getSplitSizeInBytes()
    {
        return OptionalLong.of(fileSplit.getLength());
    }

    @Override
    public String toString()
    {
        return toStringHelper(this)
                .addValue(fileSplit.getPath())
                .addValue(fileSplit.getStart())
                .addValue(fileSplit.getLength())
                .addValue(fileSplit.getFileSize())
                .addValue(s3SelectPushdownEnabled)
                .addValue(cacheQuotaRequirement)
                .toString();
    }

    public static class BucketConversion
    {
        private final int tableBucketCount;
        private final int partitionBucketCount;
        private final List<HiveColumnHandle> bucketColumnNames;
        // tableBucketNumber is needed, but can be found in tableBucketNumber field of HiveSplit.

        @JsonCreator
        public BucketConversion(
                @JsonProperty("tableBucketCount") int tableBucketCount,
                @JsonProperty("partitionBucketCount") int partitionBucketCount,
                @JsonProperty("bucketColumnHandles") List<HiveColumnHandle> bucketColumnHandles)
        {
            this.tableBucketCount = tableBucketCount;
            this.partitionBucketCount = partitionBucketCount;
            this.bucketColumnNames = requireNonNull(bucketColumnHandles, "bucketColumnHandles is null");
        }

        @JsonProperty
        public int getTableBucketCount()
        {
            return tableBucketCount;
        }

        @JsonProperty
        public int getPartitionBucketCount()
        {
            return partitionBucketCount;
        }

        @JsonProperty
        public List<HiveColumnHandle> getBucketColumnHandles()
        {
            return bucketColumnNames;
        }

        @Override
        public boolean equals(Object o)
        {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }
            BucketConversion that = (BucketConversion) o;
            return tableBucketCount == that.tableBucketCount &&
                    partitionBucketCount == that.partitionBucketCount &&
                    Objects.equals(bucketColumnNames, that.bucketColumnNames);
        }

        @Override
        public int hashCode()
        {
            return Objects.hash(tableBucketCount, partitionBucketCount, bucketColumnNames);
        }
    }
}